Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

945 statements  

1# index.py -- File parser/writer for the git index file 

2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Parser for the git index file format.""" 

23 

24import errno 

25import os 

26import shutil 

27import stat 

28import struct 

29import sys 

30import types 

31from collections.abc import Generator, Iterable, Iterator 

32from dataclasses import dataclass 

33from enum import Enum 

34from typing import ( 

35 TYPE_CHECKING, 

36 Any, 

37 BinaryIO, 

38 Callable, 

39 Optional, 

40 Union, 

41 cast, 

42) 

43 

44if TYPE_CHECKING: 

45 from .file import _GitFile 

46 from .line_ending import BlobNormalizer 

47 from .repo import Repo 

48 

49from .file import GitFile 

50from .object_store import iter_tree_contents 

51from .objects import ( 

52 S_IFGITLINK, 

53 S_ISGITLINK, 

54 Blob, 

55 ObjectID, 

56 Tree, 

57 hex_to_sha, 

58 sha_to_hex, 

59) 

60from .pack import ObjectContainer, SHA1Reader, SHA1Writer 

61 

62# 2-bit stage (during merge) 

63FLAG_STAGEMASK = 0x3000 

64FLAG_STAGESHIFT = 12 

65FLAG_NAMEMASK = 0x0FFF 

66 

67# assume-valid 

68FLAG_VALID = 0x8000 

69 

70# extended flag (must be zero in version 2) 

71FLAG_EXTENDED = 0x4000 

72 

73# used by sparse checkout 

74EXTENDED_FLAG_SKIP_WORKTREE = 0x4000 

75 

76# used by "git add -N" 

77EXTENDED_FLAG_INTEND_TO_ADD = 0x2000 

78 

79DEFAULT_VERSION = 2 

80 

81# Index extension signatures 

82TREE_EXTENSION = b"TREE" 

83REUC_EXTENSION = b"REUC" 

84UNTR_EXTENSION = b"UNTR" 

85EOIE_EXTENSION = b"EOIE" 

86IEOT_EXTENSION = b"IEOT" 

87 

88 

89def _encode_varint(value: int) -> bytes: 

90 """Encode an integer using variable-width encoding. 

91 

92 Same format as used for OFS_DELTA pack entries and index v4 path compression. 

93 Uses 7 bits per byte, with the high bit indicating continuation. 

94 

95 Args: 

96 value: Integer to encode 

97 Returns: 

98 Encoded bytes 

99 """ 

100 if value == 0: 

101 return b"\x00" 

102 

103 result = [] 

104 while value > 0: 

105 byte = value & 0x7F # Take lower 7 bits 

106 value >>= 7 

107 if value > 0: 

108 byte |= 0x80 # Set continuation bit 

109 result.append(byte) 

110 

111 return bytes(result) 

112 

113 

114def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: 

115 """Decode a variable-width encoded integer. 

116 

117 Args: 

118 data: Bytes to decode from 

119 offset: Starting offset in data 

120 Returns: 

121 tuple of (decoded_value, new_offset) 

122 """ 

123 value = 0 

124 shift = 0 

125 pos = offset 

126 

127 while pos < len(data): 

128 byte = data[pos] 

129 pos += 1 

130 value |= (byte & 0x7F) << shift 

131 shift += 7 

132 if not (byte & 0x80): # No continuation bit 

133 break 

134 

135 return value, pos 

136 

137 

138def _compress_path(path: bytes, previous_path: bytes) -> bytes: 

139 """Compress a path relative to the previous path for index version 4. 

140 

141 Args: 

142 path: Path to compress 

143 previous_path: Previous path for comparison 

144 Returns: 

145 Compressed path data (varint prefix_len + suffix) 

146 """ 

147 # Find the common prefix length 

148 common_len = 0 

149 min_len = min(len(path), len(previous_path)) 

150 

151 for i in range(min_len): 

152 if path[i] == previous_path[i]: 

153 common_len += 1 

154 else: 

155 break 

156 

157 # The number of bytes to remove from the end of previous_path 

158 # to get the common prefix 

159 remove_len = len(previous_path) - common_len 

160 

161 # The suffix to append 

162 suffix = path[common_len:] 

163 

164 # Encode: varint(remove_len) + suffix + NUL 

165 return _encode_varint(remove_len) + suffix + b"\x00" 

166 

167 

168def _decompress_path( 

169 data: bytes, offset: int, previous_path: bytes 

170) -> tuple[bytes, int]: 

171 """Decompress a path from index version 4 compressed format. 

172 

173 Args: 

174 data: Raw data containing compressed path 

175 offset: Starting offset in data 

176 previous_path: Previous path for decompression 

177 Returns: 

178 tuple of (decompressed_path, new_offset) 

179 """ 

180 # Decode the number of bytes to remove from previous path 

181 remove_len, new_offset = _decode_varint(data, offset) 

182 

183 # Find the NUL terminator for the suffix 

184 suffix_start = new_offset 

185 suffix_end = suffix_start 

186 while suffix_end < len(data) and data[suffix_end] != 0: 

187 suffix_end += 1 

188 

189 if suffix_end >= len(data): 

190 raise ValueError("Unterminated path suffix in compressed entry") 

191 

192 suffix = data[suffix_start:suffix_end] 

193 new_offset = suffix_end + 1 # Skip the NUL terminator 

194 

195 # Reconstruct the path 

196 if remove_len > len(previous_path): 

197 raise ValueError( 

198 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

199 ) 

200 

201 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

202 path = prefix + suffix 

203 

204 return path, new_offset 

205 

206 

207def _decompress_path_from_stream( 

208 f: BinaryIO, previous_path: bytes 

209) -> tuple[bytes, int]: 

210 """Decompress a path from index version 4 compressed format, reading from stream. 

211 

212 Args: 

213 f: File-like object to read from 

214 previous_path: Previous path for decompression 

215 Returns: 

216 tuple of (decompressed_path, bytes_consumed) 

217 """ 

218 # Decode the varint for remove_len by reading byte by byte 

219 remove_len = 0 

220 shift = 0 

221 bytes_consumed = 0 

222 

223 while True: 

224 byte_data = f.read(1) 

225 if not byte_data: 

226 raise ValueError("Unexpected end of file while reading varint") 

227 byte = byte_data[0] 

228 bytes_consumed += 1 

229 remove_len |= (byte & 0x7F) << shift 

230 shift += 7 

231 if not (byte & 0x80): # No continuation bit 

232 break 

233 

234 # Read the suffix until NUL terminator 

235 suffix = b"" 

236 while True: 

237 byte_data = f.read(1) 

238 if not byte_data: 

239 raise ValueError("Unexpected end of file while reading path suffix") 

240 byte = byte_data[0] 

241 bytes_consumed += 1 

242 if byte == 0: # NUL terminator 

243 break 

244 suffix += bytes([byte]) 

245 

246 # Reconstruct the path 

247 if remove_len > len(previous_path): 

248 raise ValueError( 

249 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path" 

250 ) 

251 

252 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path 

253 path = prefix + suffix 

254 

255 return path, bytes_consumed 

256 

257 

258class Stage(Enum): 

259 NORMAL = 0 

260 MERGE_CONFLICT_ANCESTOR = 1 

261 MERGE_CONFLICT_THIS = 2 

262 MERGE_CONFLICT_OTHER = 3 

263 

264 

265@dataclass 

266class SerializedIndexEntry: 

267 name: bytes 

268 ctime: Union[int, float, tuple[int, int]] 

269 mtime: Union[int, float, tuple[int, int]] 

270 dev: int 

271 ino: int 

272 mode: int 

273 uid: int 

274 gid: int 

275 size: int 

276 sha: bytes 

277 flags: int 

278 extended_flags: int 

279 

280 def stage(self) -> Stage: 

281 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

282 

283 

284@dataclass 

285class IndexExtension: 

286 """Base class for index extensions.""" 

287 

288 signature: bytes 

289 data: bytes 

290 

291 @classmethod 

292 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension": 

293 """Create an extension from raw data. 

294 

295 Args: 

296 signature: 4-byte extension signature 

297 data: Extension data 

298 Returns: 

299 Parsed extension object 

300 """ 

301 if signature == TREE_EXTENSION: 

302 return TreeExtension.from_bytes(data) 

303 elif signature == REUC_EXTENSION: 

304 return ResolveUndoExtension.from_bytes(data) 

305 elif signature == UNTR_EXTENSION: 

306 return UntrackedExtension.from_bytes(data) 

307 else: 

308 # Unknown extension - just store raw data 

309 return cls(signature, data) 

310 

311 def to_bytes(self) -> bytes: 

312 """Serialize extension to bytes.""" 

313 return self.data 

314 

315 

316class TreeExtension(IndexExtension): 

317 """Tree cache extension.""" 

318 

319 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None: 

320 self.entries = entries 

321 super().__init__(TREE_EXTENSION, b"") 

322 

323 @classmethod 

324 def from_bytes(cls, data: bytes) -> "TreeExtension": 

325 # TODO: Implement tree cache parsing 

326 return cls([]) 

327 

328 def to_bytes(self) -> bytes: 

329 # TODO: Implement tree cache serialization 

330 return b"" 

331 

332 

333class ResolveUndoExtension(IndexExtension): 

334 """Resolve undo extension for recording merge conflicts.""" 

335 

336 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None: 

337 self.entries = entries 

338 super().__init__(REUC_EXTENSION, b"") 

339 

340 @classmethod 

341 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension": 

342 # TODO: Implement resolve undo parsing 

343 return cls([]) 

344 

345 def to_bytes(self) -> bytes: 

346 # TODO: Implement resolve undo serialization 

347 return b"" 

348 

349 

350class UntrackedExtension(IndexExtension): 

351 """Untracked cache extension.""" 

352 

353 def __init__(self, data: bytes) -> None: 

354 super().__init__(UNTR_EXTENSION, data) 

355 

356 @classmethod 

357 def from_bytes(cls, data: bytes) -> "UntrackedExtension": 

358 return cls(data) 

359 

360 

361@dataclass 

362class IndexEntry: 

363 ctime: Union[int, float, tuple[int, int]] 

364 mtime: Union[int, float, tuple[int, int]] 

365 dev: int 

366 ino: int 

367 mode: int 

368 uid: int 

369 gid: int 

370 size: int 

371 sha: bytes 

372 flags: int = 0 

373 extended_flags: int = 0 

374 

375 @classmethod 

376 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry": 

377 return cls( 

378 ctime=serialized.ctime, 

379 mtime=serialized.mtime, 

380 dev=serialized.dev, 

381 ino=serialized.ino, 

382 mode=serialized.mode, 

383 uid=serialized.uid, 

384 gid=serialized.gid, 

385 size=serialized.size, 

386 sha=serialized.sha, 

387 flags=serialized.flags, 

388 extended_flags=serialized.extended_flags, 

389 ) 

390 

391 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry: 

392 # Clear out any existing stage bits, then set them from the Stage. 

393 new_flags = self.flags & ~FLAG_STAGEMASK 

394 new_flags |= stage.value << FLAG_STAGESHIFT 

395 return SerializedIndexEntry( 

396 name=name, 

397 ctime=self.ctime, 

398 mtime=self.mtime, 

399 dev=self.dev, 

400 ino=self.ino, 

401 mode=self.mode, 

402 uid=self.uid, 

403 gid=self.gid, 

404 size=self.size, 

405 sha=self.sha, 

406 flags=new_flags, 

407 extended_flags=self.extended_flags, 

408 ) 

409 

410 def stage(self) -> Stage: 

411 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT) 

412 

413 @property 

414 def skip_worktree(self) -> bool: 

415 """Return True if the skip-worktree bit is set in extended_flags.""" 

416 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE) 

417 

418 def set_skip_worktree(self, skip: bool = True) -> None: 

419 """Helper method to set or clear the skip-worktree bit in extended_flags. 

420 Also sets FLAG_EXTENDED in self.flags if needed. 

421 """ 

422 if skip: 

423 # Turn on the skip-worktree bit 

424 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE 

425 # Also ensure the main 'extended' bit is set in flags 

426 self.flags |= FLAG_EXTENDED 

427 else: 

428 # Turn off the skip-worktree bit 

429 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE 

430 # Optionally unset the main extended bit if no extended flags remain 

431 if self.extended_flags == 0: 

432 self.flags &= ~FLAG_EXTENDED 

433 

434 

435class ConflictedIndexEntry: 

436 """Index entry that represents a conflict.""" 

437 

438 ancestor: Optional[IndexEntry] 

439 this: Optional[IndexEntry] 

440 other: Optional[IndexEntry] 

441 

442 def __init__( 

443 self, 

444 ancestor: Optional[IndexEntry] = None, 

445 this: Optional[IndexEntry] = None, 

446 other: Optional[IndexEntry] = None, 

447 ) -> None: 

448 self.ancestor = ancestor 

449 self.this = this 

450 self.other = other 

451 

452 

453class UnmergedEntries(Exception): 

454 """Unmerged entries exist in the index.""" 

455 

456 

457def pathsplit(path: bytes) -> tuple[bytes, bytes]: 

458 """Split a /-delimited path into a directory part and a basename. 

459 

460 Args: 

461 path: The path to split. 

462 

463 Returns: 

464 Tuple with directory name and basename 

465 """ 

466 try: 

467 (dirname, basename) = path.rsplit(b"/", 1) 

468 except ValueError: 

469 return (b"", path) 

470 else: 

471 return (dirname, basename) 

472 

473 

474def pathjoin(*args: bytes) -> bytes: 

475 """Join a /-delimited path.""" 

476 return b"/".join([p for p in args if p]) 

477 

478 

479def read_cache_time(f: BinaryIO) -> tuple[int, int]: 

480 """Read a cache time. 

481 

482 Args: 

483 f: File-like object to read from 

484 Returns: 

485 Tuple with seconds and nanoseconds 

486 """ 

487 return struct.unpack(">LL", f.read(8)) 

488 

489 

490def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None: 

491 """Write a cache time. 

492 

493 Args: 

494 f: File-like object to write to 

495 t: Time to write (as int, float or tuple with secs and nsecs) 

496 """ 

497 if isinstance(t, int): 

498 t = (t, 0) 

499 elif isinstance(t, float): 

500 (secs, nsecs) = divmod(t, 1.0) 

501 t = (int(secs), int(nsecs * 1000000000)) 

502 elif not isinstance(t, tuple): 

503 raise TypeError(t) 

504 f.write(struct.pack(">LL", *t)) 

505 

506 

507def read_cache_entry( 

508 f: BinaryIO, version: int, previous_path: bytes = b"" 

509) -> SerializedIndexEntry: 

510 """Read an entry from a cache file. 

511 

512 Args: 

513 f: File-like object to read from 

514 version: Index version 

515 previous_path: Previous entry's path (for version 4 compression) 

516 """ 

517 beginoffset = f.tell() 

518 ctime = read_cache_time(f) 

519 mtime = read_cache_time(f) 

520 ( 

521 dev, 

522 ino, 

523 mode, 

524 uid, 

525 gid, 

526 size, 

527 sha, 

528 flags, 

529 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) 

530 if flags & FLAG_EXTENDED: 

531 if version < 3: 

532 raise AssertionError("extended flag set in index with version < 3") 

533 (extended_flags,) = struct.unpack(">H", f.read(2)) 

534 else: 

535 extended_flags = 0 

536 

537 if version >= 4: 

538 # Version 4: paths are always compressed (name_len should be 0) 

539 name, consumed = _decompress_path_from_stream(f, previous_path) 

540 else: 

541 # Versions < 4: regular name reading 

542 name = f.read(flags & FLAG_NAMEMASK) 

543 

544 # Padding: 

545 if version < 4: 

546 real_size = (f.tell() - beginoffset + 8) & ~7 

547 f.read((beginoffset + real_size) - f.tell()) 

548 

549 return SerializedIndexEntry( 

550 name, 

551 ctime, 

552 mtime, 

553 dev, 

554 ino, 

555 mode, 

556 uid, 

557 gid, 

558 size, 

559 sha_to_hex(sha), 

560 flags & ~FLAG_NAMEMASK, 

561 extended_flags, 

562 ) 

563 

564 

565def write_cache_entry( 

566 f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b"" 

567) -> None: 

568 """Write an index entry to a file. 

569 

570 Args: 

571 f: File object 

572 entry: IndexEntry to write 

573 version: Index format version 

574 previous_path: Previous entry's path (for version 4 compression) 

575 """ 

576 beginoffset = f.tell() 

577 write_cache_time(f, entry.ctime) 

578 write_cache_time(f, entry.mtime) 

579 

580 if version >= 4: 

581 # Version 4: use compression but set name_len to actual filename length 

582 # This matches how C Git implements index v4 flags 

583 compressed_path = _compress_path(entry.name, previous_path) 

584 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

585 else: 

586 # Versions < 4: include actual name length 

587 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK) 

588 

589 if entry.extended_flags: 

590 flags |= FLAG_EXTENDED 

591 if flags & FLAG_EXTENDED and version is not None and version < 3: 

592 raise AssertionError("unable to use extended flags in version < 3") 

593 

594 f.write( 

595 struct.pack( 

596 b">LLLLLL20sH", 

597 entry.dev & 0xFFFFFFFF, 

598 entry.ino & 0xFFFFFFFF, 

599 entry.mode, 

600 entry.uid, 

601 entry.gid, 

602 entry.size, 

603 hex_to_sha(entry.sha), 

604 flags, 

605 ) 

606 ) 

607 if flags & FLAG_EXTENDED: 

608 f.write(struct.pack(b">H", entry.extended_flags)) 

609 

610 if version >= 4: 

611 # Version 4: always write compressed path 

612 f.write(compressed_path) 

613 else: 

614 # Versions < 4: write regular path and padding 

615 f.write(entry.name) 

616 real_size = (f.tell() - beginoffset + 8) & ~7 

617 f.write(b"\0" * ((beginoffset + real_size) - f.tell())) 

618 

619 

620class UnsupportedIndexFormat(Exception): 

621 """An unsupported index format was encountered.""" 

622 

623 def __init__(self, version: int) -> None: 

624 self.index_format_version = version 

625 

626 

627def read_index_header(f: BinaryIO) -> tuple[int, int]: 

628 """Read an index header from a file. 

629 

630 Returns: 

631 tuple of (version, num_entries) 

632 """ 

633 header = f.read(4) 

634 if header != b"DIRC": 

635 raise AssertionError(f"Invalid index file header: {header!r}") 

636 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2)) 

637 if version not in (1, 2, 3, 4): 

638 raise UnsupportedIndexFormat(version) 

639 return version, num_entries 

640 

641 

642def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None: 

643 """Write an index extension. 

644 

645 Args: 

646 f: File-like object to write to 

647 extension: Extension to write 

648 """ 

649 data = extension.to_bytes() 

650 f.write(extension.signature) 

651 f.write(struct.pack(">I", len(data))) 

652 f.write(data) 

653 

654 

655def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]: 

656 """Read an index file, yielding the individual entries.""" 

657 version, num_entries = read_index_header(f) 

658 previous_path = b"" 

659 for i in range(num_entries): 

660 entry = read_cache_entry(f, version, previous_path) 

661 previous_path = entry.name 

662 yield entry 

663 

664 

665def read_index_dict_with_version( 

666 f: BinaryIO, 

667) -> tuple[ 

668 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension] 

669]: 

670 """Read an index file and return it as a dictionary along with the version. 

671 

672 Returns: 

673 tuple of (entries_dict, version, extensions) 

674 """ 

675 version, num_entries = read_index_header(f) 

676 

677 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

678 previous_path = b"" 

679 for i in range(num_entries): 

680 entry = read_cache_entry(f, version, previous_path) 

681 previous_path = entry.name 

682 stage = entry.stage() 

683 if stage == Stage.NORMAL: 

684 ret[entry.name] = IndexEntry.from_serialized(entry) 

685 else: 

686 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

687 if isinstance(existing, IndexEntry): 

688 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

689 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

690 existing.ancestor = IndexEntry.from_serialized(entry) 

691 elif stage == Stage.MERGE_CONFLICT_THIS: 

692 existing.this = IndexEntry.from_serialized(entry) 

693 elif stage == Stage.MERGE_CONFLICT_OTHER: 

694 existing.other = IndexEntry.from_serialized(entry) 

695 

696 # Read extensions 

697 extensions = [] 

698 while True: 

699 # Check if we're at the end (20 bytes before EOF for SHA checksum) 

700 current_pos = f.tell() 

701 f.seek(0, 2) # EOF 

702 eof_pos = f.tell() 

703 f.seek(current_pos) 

704 

705 if current_pos >= eof_pos - 20: 

706 break 

707 

708 # Try to read extension signature 

709 signature = f.read(4) 

710 if len(signature) < 4: 

711 break 

712 

713 # Check if it's a valid extension signature (4 uppercase letters) 

714 if not all(65 <= b <= 90 for b in signature): 

715 # Not an extension, seek back 

716 f.seek(-4, 1) 

717 break 

718 

719 # Read extension size 

720 size_data = f.read(4) 

721 if len(size_data) < 4: 

722 break 

723 size = struct.unpack(">I", size_data)[0] 

724 

725 # Read extension data 

726 data = f.read(size) 

727 if len(data) < size: 

728 break 

729 

730 extension = IndexExtension.from_raw(signature, data) 

731 extensions.append(extension) 

732 

733 return ret, version, extensions 

734 

735 

736def read_index_dict( 

737 f: BinaryIO, 

738) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]: 

739 """Read an index file and return it as a dictionary. 

740 Dict Key is tuple of path and stage number, as 

741 path alone is not unique 

742 Args: 

743 f: File object to read fromls. 

744 """ 

745 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {} 

746 for entry in read_index(f): 

747 stage = entry.stage() 

748 if stage == Stage.NORMAL: 

749 ret[entry.name] = IndexEntry.from_serialized(entry) 

750 else: 

751 existing = ret.setdefault(entry.name, ConflictedIndexEntry()) 

752 if isinstance(existing, IndexEntry): 

753 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists") 

754 if stage == Stage.MERGE_CONFLICT_ANCESTOR: 

755 existing.ancestor = IndexEntry.from_serialized(entry) 

756 elif stage == Stage.MERGE_CONFLICT_THIS: 

757 existing.this = IndexEntry.from_serialized(entry) 

758 elif stage == Stage.MERGE_CONFLICT_OTHER: 

759 existing.other = IndexEntry.from_serialized(entry) 

760 return ret 

761 

762 

763def write_index( 

764 f: BinaryIO, 

765 entries: list[SerializedIndexEntry], 

766 version: Optional[int] = None, 

767 extensions: Optional[list[IndexExtension]] = None, 

768) -> None: 

769 """Write an index file. 

770 

771 Args: 

772 f: File-like object to write to 

773 version: Version number to write 

774 entries: Iterable over the entries to write 

775 extensions: Optional list of extensions to write 

776 """ 

777 if version is None: 

778 version = DEFAULT_VERSION 

779 # STEP 1: check if any extended_flags are set 

780 uses_extended_flags = any(e.extended_flags != 0 for e in entries) 

781 if uses_extended_flags and version < 3: 

782 # Force or bump the version to 3 

783 version = 3 

784 # The rest is unchanged, but you might insert a final check: 

785 if version < 3: 

786 # Double-check no extended flags appear 

787 for e in entries: 

788 if e.extended_flags != 0: 

789 raise AssertionError("Attempt to use extended flags in index < v3") 

790 # Proceed with the existing code to write the header and entries. 

791 f.write(b"DIRC") 

792 f.write(struct.pack(b">LL", version, len(entries))) 

793 previous_path = b"" 

794 for entry in entries: 

795 write_cache_entry(f, entry, version=version, previous_path=previous_path) 

796 previous_path = entry.name 

797 

798 # Write extensions 

799 if extensions: 

800 for extension in extensions: 

801 write_index_extension(f, extension) 

802 

803 

804def write_index_dict( 

805 f: BinaryIO, 

806 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], 

807 version: Optional[int] = None, 

808 extensions: Optional[list[IndexExtension]] = None, 

809) -> None: 

810 """Write an index file based on the contents of a dictionary. 

811 being careful to sort by path and then by stage. 

812 """ 

813 entries_list = [] 

814 for key in sorted(entries): 

815 value = entries[key] 

816 if isinstance(value, ConflictedIndexEntry): 

817 if value.ancestor is not None: 

818 entries_list.append( 

819 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR) 

820 ) 

821 if value.this is not None: 

822 entries_list.append( 

823 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS) 

824 ) 

825 if value.other is not None: 

826 entries_list.append( 

827 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER) 

828 ) 

829 else: 

830 entries_list.append(value.serialize(key, Stage.NORMAL)) 

831 

832 write_index(f, entries_list, version=version, extensions=extensions) 

833 

834 

835def cleanup_mode(mode: int) -> int: 

836 """Cleanup a mode value. 

837 

838 This will return a mode that can be stored in a tree object. 

839 

840 Args: 

841 mode: Mode to clean up. 

842 

843 Returns: 

844 mode 

845 """ 

846 if stat.S_ISLNK(mode): 

847 return stat.S_IFLNK 

848 elif stat.S_ISDIR(mode): 

849 return stat.S_IFDIR 

850 elif S_ISGITLINK(mode): 

851 return S_IFGITLINK 

852 ret = stat.S_IFREG | 0o644 

853 if mode & 0o100: 

854 ret |= 0o111 

855 return ret 

856 

857 

858class Index: 

859 """A Git Index file.""" 

860 

861 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

862 

863 def __init__( 

864 self, 

865 filename: Union[bytes, str, os.PathLike], 

866 read: bool = True, 

867 skip_hash: bool = False, 

868 version: Optional[int] = None, 

869 ) -> None: 

870 """Create an index object associated with the given filename. 

871 

872 Args: 

873 filename: Path to the index file 

874 read: Whether to initialize the index from the given file, should it exist. 

875 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature) 

876 version: Index format version to use (None = auto-detect from file or use default) 

877 """ 

878 self._filename = os.fspath(filename) 

879 # TODO(jelmer): Store the version returned by read_index 

880 self._version = version 

881 self._skip_hash = skip_hash 

882 self._extensions: list[IndexExtension] = [] 

883 self.clear() 

884 if read: 

885 self.read() 

886 

887 @property 

888 def path(self) -> Union[bytes, str]: 

889 return self._filename 

890 

891 def __repr__(self) -> str: 

892 return f"{self.__class__.__name__}({self._filename!r})" 

893 

894 def write(self) -> None: 

895 """Write current contents of index to disk.""" 

896 from typing import BinaryIO, cast 

897 

898 f = GitFile(self._filename, "wb") 

899 try: 

900 if self._skip_hash: 

901 # When skipHash is enabled, write the index without computing SHA1 

902 write_index_dict( 

903 cast(BinaryIO, f), 

904 self._byname, 

905 version=self._version, 

906 extensions=self._extensions, 

907 ) 

908 # Write 20 zero bytes instead of SHA1 

909 f.write(b"\x00" * 20) 

910 f.close() 

911 else: 

912 sha1_writer = SHA1Writer(cast(BinaryIO, f)) 

913 write_index_dict( 

914 cast(BinaryIO, sha1_writer), 

915 self._byname, 

916 version=self._version, 

917 extensions=self._extensions, 

918 ) 

919 sha1_writer.close() 

920 except: 

921 f.close() 

922 raise 

923 

924 def read(self) -> None: 

925 """Read current contents of index from disk.""" 

926 if not os.path.exists(self._filename): 

927 return 

928 f = GitFile(self._filename, "rb") 

929 try: 

930 sha1_reader = SHA1Reader(f) 

931 entries, version, extensions = read_index_dict_with_version( 

932 cast(BinaryIO, sha1_reader) 

933 ) 

934 self._version = version 

935 self._extensions = extensions 

936 self.update(entries) 

937 # Extensions have already been read by read_index_dict_with_version 

938 sha1_reader.check_sha(allow_empty=True) 

939 finally: 

940 f.close() 

941 

942 def __len__(self) -> int: 

943 """Number of entries in this index file.""" 

944 return len(self._byname) 

945 

946 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]: 

947 """Retrieve entry by relative path and stage. 

948 

949 Returns: Either a IndexEntry or a ConflictedIndexEntry 

950 Raises KeyError: if the entry does not exist 

951 """ 

952 return self._byname[key] 

953 

954 def __iter__(self) -> Iterator[bytes]: 

955 """Iterate over the paths and stages in this index.""" 

956 return iter(self._byname) 

957 

958 def __contains__(self, key: bytes) -> bool: 

959 return key in self._byname 

960 

961 def get_sha1(self, path: bytes) -> bytes: 

962 """Return the (git object) SHA1 for the object at a path.""" 

963 value = self[path] 

964 if isinstance(value, ConflictedIndexEntry): 

965 raise UnmergedEntries 

966 return value.sha 

967 

968 def get_mode(self, path: bytes) -> int: 

969 """Return the POSIX file mode for the object at a path.""" 

970 value = self[path] 

971 if isinstance(value, ConflictedIndexEntry): 

972 raise UnmergedEntries 

973 return value.mode 

974 

975 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]: 

976 """Iterate over path, sha, mode tuples for use with commit_tree.""" 

977 for path in self: 

978 entry = self[path] 

979 if isinstance(entry, ConflictedIndexEntry): 

980 raise UnmergedEntries 

981 yield path, entry.sha, cleanup_mode(entry.mode) 

982 

983 def has_conflicts(self) -> bool: 

984 for value in self._byname.values(): 

985 if isinstance(value, ConflictedIndexEntry): 

986 return True 

987 return False 

988 

989 def clear(self) -> None: 

990 """Remove all contents from this index.""" 

991 self._byname = {} 

992 

993 def __setitem__( 

994 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry] 

995 ) -> None: 

996 assert isinstance(name, bytes) 

997 self._byname[name] = value 

998 

999 def __delitem__(self, name: bytes) -> None: 

1000 del self._byname[name] 

1001 

1002 def iteritems( 

1003 self, 

1004 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1005 return iter(self._byname.items()) 

1006 

1007 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]: 

1008 return iter(self._byname.items()) 

1009 

1010 def update( 

1011 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] 

1012 ) -> None: 

1013 for key, value in entries.items(): 

1014 self[key] = value 

1015 

1016 def paths(self) -> Generator[bytes, None, None]: 

1017 yield from self._byname.keys() 

1018 

1019 def changes_from_tree( 

1020 self, 

1021 object_store: ObjectContainer, 

1022 tree: ObjectID, 

1023 want_unchanged: bool = False, 

1024 ) -> Generator[ 

1025 tuple[ 

1026 tuple[Optional[bytes], Optional[bytes]], 

1027 tuple[Optional[int], Optional[int]], 

1028 tuple[Optional[bytes], Optional[bytes]], 

1029 ], 

1030 None, 

1031 None, 

1032 ]: 

1033 """Find the differences between the contents of this index and a tree. 

1034 

1035 Args: 

1036 object_store: Object store to use for retrieving tree contents 

1037 tree: SHA1 of the root tree 

1038 want_unchanged: Whether unchanged files should be reported 

1039 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, 

1040 newmode), (oldsha, newsha) 

1041 """ 

1042 

1043 def lookup_entry(path: bytes) -> tuple[bytes, int]: 

1044 entry = self[path] 

1045 if hasattr(entry, "sha") and hasattr(entry, "mode"): 

1046 return entry.sha, cleanup_mode(entry.mode) 

1047 else: 

1048 # Handle ConflictedIndexEntry case 

1049 return b"", 0 

1050 

1051 yield from changes_from_tree( 

1052 self.paths(), 

1053 lookup_entry, 

1054 object_store, 

1055 tree, 

1056 want_unchanged=want_unchanged, 

1057 ) 

1058 

1059 def commit(self, object_store: ObjectContainer) -> bytes: 

1060 """Create a new tree from an index. 

1061 

1062 Args: 

1063 object_store: Object store to save the tree in 

1064 Returns: 

1065 Root tree SHA 

1066 """ 

1067 return commit_tree(object_store, self.iterobjects()) 

1068 

1069 

1070def commit_tree( 

1071 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]] 

1072) -> bytes: 

1073 """Commit a new tree. 

1074 

1075 Args: 

1076 object_store: Object store to add trees to 

1077 blobs: Iterable over blob path, sha, mode entries 

1078 Returns: 

1079 SHA1 of the created tree. 

1080 """ 

1081 trees: dict[bytes, Any] = {b"": {}} 

1082 

1083 def add_tree(path: bytes) -> dict[bytes, Any]: 

1084 if path in trees: 

1085 return trees[path] 

1086 dirname, basename = pathsplit(path) 

1087 t = add_tree(dirname) 

1088 assert isinstance(basename, bytes) 

1089 newtree: dict[bytes, Any] = {} 

1090 t[basename] = newtree 

1091 trees[path] = newtree 

1092 return newtree 

1093 

1094 for path, sha, mode in blobs: 

1095 tree_path, basename = pathsplit(path) 

1096 tree = add_tree(tree_path) 

1097 tree[basename] = (mode, sha) 

1098 

1099 def build_tree(path: bytes) -> bytes: 

1100 tree = Tree() 

1101 for basename, entry in trees[path].items(): 

1102 if isinstance(entry, dict): 

1103 mode = stat.S_IFDIR 

1104 sha = build_tree(pathjoin(path, basename)) 

1105 else: 

1106 (mode, sha) = entry 

1107 tree.add(basename, mode, sha) 

1108 object_store.add_object(tree) 

1109 return tree.id 

1110 

1111 return build_tree(b"") 

1112 

1113 

1114def commit_index(object_store: ObjectContainer, index: Index) -> bytes: 

1115 """Create a new tree from an index. 

1116 

1117 Args: 

1118 object_store: Object store to save the tree in 

1119 index: Index file 

1120 Note: This function is deprecated, use index.commit() instead. 

1121 Returns: Root tree sha. 

1122 """ 

1123 return commit_tree(object_store, index.iterobjects()) 

1124 

1125 

1126def changes_from_tree( 

1127 names: Iterable[bytes], 

1128 lookup_entry: Callable[[bytes], tuple[bytes, int]], 

1129 object_store: ObjectContainer, 

1130 tree: Optional[bytes], 

1131 want_unchanged: bool = False, 

1132) -> Iterable[ 

1133 tuple[ 

1134 tuple[Optional[bytes], Optional[bytes]], 

1135 tuple[Optional[int], Optional[int]], 

1136 tuple[Optional[bytes], Optional[bytes]], 

1137 ] 

1138]: 

1139 """Find the differences between the contents of a tree and 

1140 a working copy. 

1141 

1142 Args: 

1143 names: Iterable of names in the working copy 

1144 lookup_entry: Function to lookup an entry in the working copy 

1145 object_store: Object store to use for retrieving tree contents 

1146 tree: SHA1 of the root tree, or None for an empty tree 

1147 want_unchanged: Whether unchanged files should be reported 

1148 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), 

1149 (oldsha, newsha) 

1150 """ 

1151 # TODO(jelmer): Support a include_trees option 

1152 other_names = set(names) 

1153 

1154 if tree is not None: 

1155 for name, mode, sha in iter_tree_contents(object_store, tree): 

1156 try: 

1157 (other_sha, other_mode) = lookup_entry(name) 

1158 except KeyError: 

1159 # Was removed 

1160 yield ((name, None), (mode, None), (sha, None)) 

1161 else: 

1162 other_names.remove(name) 

1163 if want_unchanged or other_sha != sha or other_mode != mode: 

1164 yield ((name, name), (mode, other_mode), (sha, other_sha)) 

1165 

1166 # Mention added files 

1167 for name in other_names: 

1168 try: 

1169 (other_sha, other_mode) = lookup_entry(name) 

1170 except KeyError: 

1171 pass 

1172 else: 

1173 yield ((None, name), (None, other_mode), (None, other_sha)) 

1174 

1175 

1176def index_entry_from_stat( 

1177 stat_val: os.stat_result, 

1178 hex_sha: bytes, 

1179 mode: Optional[int] = None, 

1180) -> IndexEntry: 

1181 """Create a new index entry from a stat value. 

1182 

1183 Args: 

1184 stat_val: POSIX stat_result instance 

1185 hex_sha: Hex sha of the object 

1186 """ 

1187 if mode is None: 

1188 mode = cleanup_mode(stat_val.st_mode) 

1189 

1190 return IndexEntry( 

1191 ctime=stat_val.st_ctime, 

1192 mtime=stat_val.st_mtime, 

1193 dev=stat_val.st_dev, 

1194 ino=stat_val.st_ino, 

1195 mode=mode, 

1196 uid=stat_val.st_uid, 

1197 gid=stat_val.st_gid, 

1198 size=stat_val.st_size, 

1199 sha=hex_sha, 

1200 flags=0, 

1201 extended_flags=0, 

1202 ) 

1203 

1204 

1205if sys.platform == "win32": 

1206 # On Windows, creating symlinks either requires administrator privileges 

1207 # or developer mode. Raise a more helpful error when we're unable to 

1208 # create symlinks 

1209 

1210 # https://github.com/jelmer/dulwich/issues/1005 

1211 

1212 class WindowsSymlinkPermissionError(PermissionError): 

1213 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None: 

1214 super(PermissionError, self).__init__( 

1215 errno, 

1216 f"Unable to create symlink; do you have developer mode enabled? {msg}", 

1217 filename, 

1218 ) 

1219 

1220 def symlink( 

1221 src: Union[str, bytes], 

1222 dst: Union[str, bytes], 

1223 target_is_directory: bool = False, 

1224 *, 

1225 dir_fd: Optional[int] = None, 

1226 ) -> None: 

1227 try: 

1228 return os.symlink( 

1229 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd 

1230 ) 

1231 except PermissionError as e: 

1232 raise WindowsSymlinkPermissionError( 

1233 e.errno or 0, e.strerror or "", e.filename 

1234 ) from e 

1235else: 

1236 symlink = os.symlink 

1237 

1238 

1239def build_file_from_blob( 

1240 blob: Blob, 

1241 mode: int, 

1242 target_path: bytes, 

1243 *, 

1244 honor_filemode: bool = True, 

1245 tree_encoding: str = "utf-8", 

1246 symlink_fn: Optional[Callable] = None, 

1247) -> os.stat_result: 

1248 """Build a file or symlink on disk based on a Git object. 

1249 

1250 Args: 

1251 blob: The git object 

1252 mode: File mode 

1253 target_path: Path to write to 

1254 honor_filemode: An optional flag to honor core.filemode setting in 

1255 config file, default is core.filemode=True, change executable bit 

1256 symlink_fn: Function to use for creating symlinks 

1257 Returns: stat object for the file 

1258 """ 

1259 try: 

1260 oldstat = os.lstat(target_path) 

1261 except FileNotFoundError: 

1262 oldstat = None 

1263 contents = blob.as_raw_string() 

1264 if stat.S_ISLNK(mode): 

1265 if oldstat: 

1266 _remove_file_with_readonly_handling(target_path) 

1267 if sys.platform == "win32": 

1268 # os.readlink on Python3 on Windows requires a unicode string. 

1269 contents_str = contents.decode(tree_encoding) 

1270 target_path_str = target_path.decode(tree_encoding) 

1271 (symlink_fn or symlink)(contents_str, target_path_str) 

1272 else: 

1273 (symlink_fn or symlink)(contents, target_path) 

1274 else: 

1275 if oldstat is not None and oldstat.st_size == len(contents): 

1276 with open(target_path, "rb") as f: 

1277 if f.read() == contents: 

1278 return oldstat 

1279 

1280 with open(target_path, "wb") as f: 

1281 # Write out file 

1282 f.write(contents) 

1283 

1284 if honor_filemode: 

1285 os.chmod(target_path, mode) 

1286 

1287 return os.lstat(target_path) 

1288 

1289 

1290INVALID_DOTNAMES = (b".git", b".", b"..", b"") 

1291 

1292 

1293def validate_path_element_default(element: bytes) -> bool: 

1294 return element.lower() not in INVALID_DOTNAMES 

1295 

1296 

1297def validate_path_element_ntfs(element: bytes) -> bool: 

1298 stripped = element.rstrip(b". ").lower() 

1299 if stripped in INVALID_DOTNAMES: 

1300 return False 

1301 if stripped == b"git~1": 

1302 return False 

1303 return True 

1304 

1305 

1306def validate_path( 

1307 path: bytes, 

1308 element_validator: Callable[[bytes], bool] = validate_path_element_default, 

1309) -> bool: 

1310 """Default path validator that just checks for .git/.""" 

1311 parts = path.split(b"/") 

1312 for p in parts: 

1313 if not element_validator(p): 

1314 return False 

1315 else: 

1316 return True 

1317 

1318 

1319def build_index_from_tree( 

1320 root_path: Union[str, bytes], 

1321 index_path: Union[str, bytes], 

1322 object_store: ObjectContainer, 

1323 tree_id: bytes, 

1324 honor_filemode: bool = True, 

1325 validate_path_element: Callable[[bytes], bool] = validate_path_element_default, 

1326 symlink_fn: Optional[Callable] = None, 

1327 blob_normalizer: Optional["BlobNormalizer"] = None, 

1328) -> None: 

1329 """Generate and materialize index from a tree. 

1330 

1331 Args: 

1332 tree_id: Tree to materialize 

1333 root_path: Target dir for materialized index files 

1334 index_path: Target path for generated index 

1335 object_store: Non-empty object store holding tree contents 

1336 honor_filemode: An optional flag to honor core.filemode setting in 

1337 config file, default is core.filemode=True, change executable bit 

1338 validate_path_element: Function to validate path elements to check 

1339 out; default just refuses .git and .. directories. 

1340 blob_normalizer: An optional BlobNormalizer to use for converting line 

1341 endings when writing blobs to the working directory. 

1342 

1343 Note: existing index is wiped and contents are not merged 

1344 in a working dir. Suitable only for fresh clones. 

1345 """ 

1346 index = Index(index_path, read=False) 

1347 if not isinstance(root_path, bytes): 

1348 root_path = os.fsencode(root_path) 

1349 

1350 for entry in iter_tree_contents(object_store, tree_id): 

1351 if not validate_path(entry.path, validate_path_element): 

1352 continue 

1353 full_path = _tree_to_fs_path(root_path, entry.path) 

1354 

1355 if not os.path.exists(os.path.dirname(full_path)): 

1356 os.makedirs(os.path.dirname(full_path)) 

1357 

1358 # TODO(jelmer): Merge new index into working tree 

1359 if S_ISGITLINK(entry.mode): 

1360 if not os.path.isdir(full_path): 

1361 os.mkdir(full_path) 

1362 st = os.lstat(full_path) 

1363 # TODO(jelmer): record and return submodule paths 

1364 else: 

1365 obj = object_store[entry.sha] 

1366 assert isinstance(obj, Blob) 

1367 # Apply blob normalization for checkout if normalizer is provided 

1368 if blob_normalizer is not None: 

1369 obj = blob_normalizer.checkout_normalize(obj, entry.path) 

1370 st = build_file_from_blob( 

1371 obj, 

1372 entry.mode, 

1373 full_path, 

1374 honor_filemode=honor_filemode, 

1375 symlink_fn=symlink_fn, 

1376 ) 

1377 

1378 # Add file to index 

1379 if not honor_filemode or S_ISGITLINK(entry.mode): 

1380 # we can not use tuple slicing to build a new tuple, 

1381 # because on windows that will convert the times to 

1382 # longs, which causes errors further along 

1383 st_tuple = ( 

1384 entry.mode, 

1385 st.st_ino, 

1386 st.st_dev, 

1387 st.st_nlink, 

1388 st.st_uid, 

1389 st.st_gid, 

1390 st.st_size, 

1391 st.st_atime, 

1392 st.st_mtime, 

1393 st.st_ctime, 

1394 ) 

1395 st = st.__class__(st_tuple) 

1396 # default to a stage 0 index entry (normal) 

1397 # when reading from the filesystem 

1398 index[entry.path] = index_entry_from_stat(st, entry.sha) 

1399 

1400 index.write() 

1401 

1402 

1403def blob_from_path_and_mode( 

1404 fs_path: bytes, mode: int, tree_encoding: str = "utf-8" 

1405) -> Blob: 

1406 """Create a blob from a path and a stat object. 

1407 

1408 Args: 

1409 fs_path: Full file system path to file 

1410 mode: File mode 

1411 Returns: A `Blob` object 

1412 """ 

1413 assert isinstance(fs_path, bytes) 

1414 blob = Blob() 

1415 if stat.S_ISLNK(mode): 

1416 if sys.platform == "win32": 

1417 # os.readlink on Python3 on Windows requires a unicode string. 

1418 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding) 

1419 else: 

1420 blob.data = os.readlink(fs_path) 

1421 else: 

1422 with open(fs_path, "rb") as f: 

1423 blob.data = f.read() 

1424 return blob 

1425 

1426 

1427def blob_from_path_and_stat( 

1428 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8" 

1429) -> Blob: 

1430 """Create a blob from a path and a stat object. 

1431 

1432 Args: 

1433 fs_path: Full file system path to file 

1434 st: A stat object 

1435 Returns: A `Blob` object 

1436 """ 

1437 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding) 

1438 

1439 

1440def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]: 

1441 """Read the head commit of a submodule. 

1442 

1443 Args: 

1444 path: path to the submodule 

1445 Returns: HEAD sha, None if not a valid head/repository 

1446 """ 

1447 from .errors import NotGitRepository 

1448 from .repo import Repo 

1449 

1450 # Repo currently expects a "str", so decode if necessary. 

1451 # TODO(jelmer): Perhaps move this into Repo() ? 

1452 if not isinstance(path, str): 

1453 path = os.fsdecode(path) 

1454 try: 

1455 repo = Repo(path) 

1456 except NotGitRepository: 

1457 return None 

1458 try: 

1459 return repo.head() 

1460 except KeyError: 

1461 return None 

1462 

1463 

1464def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool: 

1465 """Check if a directory has changed after getting an error. 

1466 

1467 When handling an error trying to create a blob from a path, call this 

1468 function. It will check if the path is a directory. If it's a directory 

1469 and a submodule, check the submodule head to see if it's has changed. If 

1470 not, consider the file as changed as Git tracked a file and not a 

1471 directory. 

1472 

1473 Return true if the given path should be considered as changed and False 

1474 otherwise or if the path is not a directory. 

1475 """ 

1476 # This is actually a directory 

1477 if os.path.exists(os.path.join(tree_path, b".git")): 

1478 # Submodule 

1479 head = read_submodule_head(tree_path) 

1480 if entry.sha != head: 

1481 return True 

1482 else: 

1483 # The file was changed to a directory, so consider it removed. 

1484 return True 

1485 

1486 return False 

1487 

1488 

1489os_sep_bytes = os.sep.encode("ascii") 

1490 

1491 

1492def _ensure_parent_dir_exists(full_path: bytes) -> None: 

1493 """Ensure parent directory exists, checking no parent is a file.""" 

1494 parent_dir = os.path.dirname(full_path) 

1495 if parent_dir and not os.path.exists(parent_dir): 

1496 # Check if any parent in the path is a file 

1497 parts = parent_dir.split(os_sep_bytes) 

1498 for i in range(len(parts)): 

1499 partial_path = os_sep_bytes.join(parts[: i + 1]) 

1500 if ( 

1501 partial_path 

1502 and os.path.exists(partial_path) 

1503 and not os.path.isdir(partial_path) 

1504 ): 

1505 # Parent path is a file, this is an error 

1506 raise OSError( 

1507 f"Cannot create directory, parent path is a file: {partial_path!r}" 

1508 ) 

1509 os.makedirs(parent_dir) 

1510 

1511 

1512def _remove_file_with_readonly_handling(path: bytes) -> None: 

1513 """Remove a file, handling read-only files on Windows. 

1514 

1515 Args: 

1516 path: Path to the file to remove 

1517 """ 

1518 try: 

1519 os.unlink(path) 

1520 except PermissionError: 

1521 # On Windows, remove read-only attribute and retry 

1522 if sys.platform == "win32": 

1523 os.chmod(path, stat.S_IWRITE | stat.S_IREAD) 

1524 os.unlink(path) 

1525 else: 

1526 raise 

1527 

1528 

1529def _remove_empty_parents(path: bytes, stop_at: bytes) -> None: 

1530 """Remove empty parent directories up to stop_at.""" 

1531 parent = os.path.dirname(path) 

1532 while parent and parent != stop_at: 

1533 try: 

1534 os.rmdir(parent) 

1535 parent = os.path.dirname(parent) 

1536 except FileNotFoundError: 

1537 # Directory doesn't exist - stop trying 

1538 break 

1539 except OSError as e: 

1540 if e.errno == errno.ENOTEMPTY: 

1541 # Directory not empty - stop trying 

1542 break 

1543 raise 

1544 

1545 

1546def _check_symlink_matches( 

1547 full_path: bytes, repo_object_store, entry_sha: bytes 

1548) -> bool: 

1549 """Check if symlink target matches expected target. 

1550 

1551 Returns True if symlink needs to be written, False if it matches. 

1552 """ 

1553 try: 

1554 current_target = os.readlink(full_path) 

1555 blob_obj = repo_object_store[entry_sha] 

1556 expected_target = blob_obj.as_raw_string() 

1557 if isinstance(current_target, str): 

1558 current_target = current_target.encode() 

1559 return current_target != expected_target 

1560 except FileNotFoundError: 

1561 # Symlink doesn't exist 

1562 return True 

1563 except OSError as e: 

1564 if e.errno == errno.EINVAL: 

1565 # Not a symlink 

1566 return True 

1567 raise 

1568 

1569 

1570def _check_file_matches( 

1571 repo_object_store, 

1572 full_path: bytes, 

1573 entry_sha: bytes, 

1574 entry_mode: int, 

1575 current_stat: os.stat_result, 

1576 honor_filemode: bool, 

1577 blob_normalizer: Optional["BlobNormalizer"] = None, 

1578 tree_path: Optional[bytes] = None, 

1579) -> bool: 

1580 """Check if a file on disk matches the expected git object. 

1581 

1582 Returns True if file needs to be written, False if it matches. 

1583 """ 

1584 # Check mode first (if honor_filemode is True) 

1585 if honor_filemode: 

1586 current_mode = stat.S_IMODE(current_stat.st_mode) 

1587 expected_mode = stat.S_IMODE(entry_mode) 

1588 if current_mode != expected_mode: 

1589 return True 

1590 

1591 # If mode matches (or we don't care), check content via size first 

1592 blob_obj = repo_object_store[entry_sha] 

1593 if current_stat.st_size != blob_obj.raw_length(): 

1594 return True 

1595 

1596 # Size matches, check actual content 

1597 try: 

1598 with open(full_path, "rb") as f: 

1599 current_content = f.read() 

1600 expected_content = blob_obj.as_raw_string() 

1601 if blob_normalizer and tree_path is not None: 

1602 normalized_blob = blob_normalizer.checkout_normalize( 

1603 blob_obj, tree_path 

1604 ) 

1605 expected_content = normalized_blob.as_raw_string() 

1606 return current_content != expected_content 

1607 except (FileNotFoundError, PermissionError, IsADirectoryError): 

1608 return True 

1609 

1610 

1611def _transition_to_submodule(repo, path, full_path, current_stat, entry, index): 

1612 """Transition any type to submodule.""" 

1613 from .submodule import ensure_submodule_placeholder 

1614 

1615 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1616 # Already a directory, just ensure .git file exists 

1617 ensure_submodule_placeholder(repo, path) 

1618 else: 

1619 # Remove whatever is there and create submodule 

1620 if current_stat is not None: 

1621 _remove_file_with_readonly_handling(full_path) 

1622 ensure_submodule_placeholder(repo, path) 

1623 

1624 st = os.lstat(full_path) 

1625 index[path] = index_entry_from_stat(st, entry.sha) 

1626 

1627 

1628def _transition_to_file( 

1629 object_store, 

1630 path, 

1631 full_path, 

1632 current_stat, 

1633 entry, 

1634 index, 

1635 honor_filemode, 

1636 symlink_fn, 

1637 blob_normalizer, 

1638): 

1639 """Transition any type to regular file or symlink.""" 

1640 # Check if we need to update 

1641 if ( 

1642 current_stat is not None 

1643 and stat.S_ISREG(current_stat.st_mode) 

1644 and not stat.S_ISLNK(entry.mode) 

1645 ): 

1646 # File to file - check if update needed 

1647 needs_update = _check_file_matches( 

1648 object_store, 

1649 full_path, 

1650 entry.sha, 

1651 entry.mode, 

1652 current_stat, 

1653 honor_filemode, 

1654 blob_normalizer, 

1655 path, 

1656 ) 

1657 elif ( 

1658 current_stat is not None 

1659 and stat.S_ISLNK(current_stat.st_mode) 

1660 and stat.S_ISLNK(entry.mode) 

1661 ): 

1662 # Symlink to symlink - check if update needed 

1663 needs_update = _check_symlink_matches(full_path, object_store, entry.sha) 

1664 else: 

1665 needs_update = True 

1666 

1667 if not needs_update: 

1668 # Just update index - current_stat should always be valid here since we're not updating 

1669 index[path] = index_entry_from_stat(current_stat, entry.sha) 

1670 return 

1671 

1672 # Remove existing entry if needed 

1673 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode): 

1674 # Remove directory 

1675 dir_contents = set(os.listdir(full_path)) 

1676 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

1677 

1678 if git_file_name in dir_contents: 

1679 if dir_contents != {git_file_name}: 

1680 raise IsADirectoryError( 

1681 f"Cannot replace submodule with untracked files: {full_path!r}" 

1682 ) 

1683 shutil.rmtree(full_path) 

1684 else: 

1685 try: 

1686 os.rmdir(full_path) 

1687 except OSError as e: 

1688 if e.errno == errno.ENOTEMPTY: 

1689 raise IsADirectoryError( 

1690 f"Cannot replace non-empty directory with file: {full_path!r}" 

1691 ) 

1692 raise 

1693 elif current_stat is not None: 

1694 _remove_file_with_readonly_handling(full_path) 

1695 

1696 # Ensure parent directory exists 

1697 _ensure_parent_dir_exists(full_path) 

1698 

1699 # Write the file 

1700 blob_obj = object_store[entry.sha] 

1701 assert isinstance(blob_obj, Blob) 

1702 if blob_normalizer: 

1703 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path) 

1704 st = build_file_from_blob( 

1705 blob_obj, 

1706 entry.mode, 

1707 full_path, 

1708 honor_filemode=honor_filemode, 

1709 symlink_fn=symlink_fn, 

1710 ) 

1711 index[path] = index_entry_from_stat(st, entry.sha) 

1712 

1713 

1714def _transition_to_absent(repo, path, full_path, current_stat, index): 

1715 """Remove any type of entry.""" 

1716 if current_stat is None: 

1717 return 

1718 

1719 if stat.S_ISDIR(current_stat.st_mode): 

1720 # Check if it's a submodule directory 

1721 dir_contents = set(os.listdir(full_path)) 

1722 git_file_name = b".git" if isinstance(full_path, bytes) else ".git" 

1723 

1724 if git_file_name in dir_contents and dir_contents == {git_file_name}: 

1725 shutil.rmtree(full_path) 

1726 else: 

1727 try: 

1728 os.rmdir(full_path) 

1729 except OSError as e: 

1730 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST): 

1731 raise 

1732 else: 

1733 _remove_file_with_readonly_handling(full_path) 

1734 

1735 try: 

1736 del index[path] 

1737 except KeyError: 

1738 pass 

1739 

1740 # Try to remove empty parent directories 

1741 _remove_empty_parents( 

1742 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

1743 ) 

1744 

1745 

1746def update_working_tree( 

1747 repo: "Repo", 

1748 old_tree_id: Optional[bytes], 

1749 new_tree_id: bytes, 

1750 honor_filemode: bool = True, 

1751 validate_path_element: Optional[Callable[[bytes], bool]] = None, 

1752 symlink_fn: Optional[Callable] = None, 

1753 force_remove_untracked: bool = False, 

1754 blob_normalizer: Optional["BlobNormalizer"] = None, 

1755) -> None: 

1756 """Update the working tree and index to match a new tree. 

1757 

1758 This function handles: 

1759 - Adding new files 

1760 - Updating modified files 

1761 - Removing deleted files 

1762 - Cleaning up empty directories 

1763 

1764 Args: 

1765 repo: Repository object 

1766 old_tree_id: SHA of the tree before the update 

1767 new_tree_id: SHA of the tree to update to 

1768 honor_filemode: An optional flag to honor core.filemode setting 

1769 validate_path_element: Function to validate path elements to check out 

1770 symlink_fn: Function to use for creating symlinks 

1771 force_remove_untracked: If True, remove files that exist in working 

1772 directory but not in target tree, even if old_tree_id is None 

1773 blob_normalizer: An optional BlobNormalizer to use for converting line 

1774 endings when writing blobs to the working directory. 

1775 """ 

1776 if validate_path_element is None: 

1777 validate_path_element = validate_path_element_default 

1778 

1779 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode() 

1780 index = repo.open_index() 

1781 

1782 # Build sets of paths for efficient lookup 

1783 new_paths = {} 

1784 for entry in iter_tree_contents(repo.object_store, new_tree_id): 

1785 if entry.path.startswith(b".git") or not validate_path( 

1786 entry.path, validate_path_element 

1787 ): 

1788 continue 

1789 new_paths[entry.path] = entry 

1790 

1791 old_paths = {} 

1792 if old_tree_id: 

1793 for entry in iter_tree_contents(repo.object_store, old_tree_id): 

1794 if not entry.path.startswith(b".git"): 

1795 old_paths[entry.path] = entry 

1796 

1797 # Process all paths 

1798 all_paths = set(new_paths.keys()) | set(old_paths.keys()) 

1799 

1800 # Check for paths that need to become directories 

1801 paths_needing_dir = set() 

1802 for path in new_paths: 

1803 parts = path.split(b"/") 

1804 for i in range(1, len(parts)): 

1805 parent = b"/".join(parts[:i]) 

1806 if parent in old_paths and parent not in new_paths: 

1807 paths_needing_dir.add(parent) 

1808 

1809 # Check if any path that needs to become a directory has been modified 

1810 current_stat: Optional[os.stat_result] 

1811 stat_cache: dict[bytes, Optional[os.stat_result]] = {} 

1812 for path in paths_needing_dir: 

1813 full_path = _tree_to_fs_path(repo_path, path) 

1814 try: 

1815 current_stat = os.lstat(full_path) 

1816 except FileNotFoundError: 

1817 # File doesn't exist, proceed 

1818 stat_cache[full_path] = None 

1819 except PermissionError: 

1820 # Can't read file, proceed 

1821 pass 

1822 else: 

1823 stat_cache[full_path] = current_stat 

1824 if stat.S_ISREG(current_stat.st_mode): 

1825 # Check if file has been modified 

1826 old_entry = old_paths[path] 

1827 if _check_file_matches( 

1828 repo.object_store, 

1829 full_path, 

1830 old_entry.sha, 

1831 old_entry.mode, 

1832 current_stat, 

1833 honor_filemode, 

1834 blob_normalizer, 

1835 path, 

1836 ): 

1837 # File has been modified, can't replace with directory 

1838 raise OSError( 

1839 f"Cannot replace modified file with directory: {path!r}" 

1840 ) 

1841 

1842 # Process in two passes: deletions first, then additions/updates 

1843 # This handles case-only renames on case-insensitive filesystems correctly 

1844 paths_to_remove = [] 

1845 paths_to_update = [] 

1846 

1847 for path in sorted(all_paths): 

1848 if path in new_paths: 

1849 paths_to_update.append(path) 

1850 else: 

1851 paths_to_remove.append(path) 

1852 

1853 # First process removals 

1854 for path in paths_to_remove: 

1855 full_path = _tree_to_fs_path(repo_path, path) 

1856 

1857 # Determine current state - use cache if available 

1858 try: 

1859 current_stat = stat_cache[full_path] 

1860 except KeyError: 

1861 try: 

1862 current_stat = os.lstat(full_path) 

1863 except FileNotFoundError: 

1864 current_stat = None 

1865 

1866 _transition_to_absent(repo, path, full_path, current_stat, index) 

1867 

1868 # Then process additions/updates 

1869 for path in paths_to_update: 

1870 full_path = _tree_to_fs_path(repo_path, path) 

1871 

1872 # Determine current state - use cache if available 

1873 try: 

1874 current_stat = stat_cache[full_path] 

1875 except KeyError: 

1876 try: 

1877 current_stat = os.lstat(full_path) 

1878 except FileNotFoundError: 

1879 current_stat = None 

1880 

1881 new_entry = new_paths[path] 

1882 

1883 # Path should exist 

1884 if S_ISGITLINK(new_entry.mode): 

1885 _transition_to_submodule( 

1886 repo, path, full_path, current_stat, new_entry, index 

1887 ) 

1888 else: 

1889 _transition_to_file( 

1890 repo.object_store, 

1891 path, 

1892 full_path, 

1893 current_stat, 

1894 new_entry, 

1895 index, 

1896 honor_filemode, 

1897 symlink_fn, 

1898 blob_normalizer, 

1899 ) 

1900 

1901 # Handle force_remove_untracked 

1902 if force_remove_untracked: 

1903 for root, dirs, files in os.walk(repo_path): 

1904 if b".git" in os.fsencode(root): 

1905 continue 

1906 root_bytes = os.fsencode(root) 

1907 for file in files: 

1908 full_path = os.path.join(root_bytes, os.fsencode(file)) 

1909 tree_path = os.path.relpath(full_path, repo_path) 

1910 if os.sep != "/": 

1911 tree_path = tree_path.replace(os.sep.encode(), b"/") 

1912 

1913 if tree_path not in new_paths: 

1914 _remove_file_with_readonly_handling(full_path) 

1915 if tree_path in index: 

1916 del index[tree_path] 

1917 

1918 # Clean up empty directories 

1919 for root, dirs, files in os.walk(repo_path, topdown=False): 

1920 root_bytes = os.fsencode(root) 

1921 if ( 

1922 b".git" not in root_bytes 

1923 and root_bytes != repo_path 

1924 and not files 

1925 and not dirs 

1926 ): 

1927 try: 

1928 os.rmdir(root) 

1929 except OSError: 

1930 pass 

1931 

1932 index.write() 

1933 

1934 

1935def get_unstaged_changes( 

1936 index: Index, 

1937 root_path: Union[str, bytes], 

1938 filter_blob_callback: Optional[Callable] = None, 

1939) -> Generator[bytes, None, None]: 

1940 """Walk through an index and check for differences against working tree. 

1941 

1942 Args: 

1943 index: index to check 

1944 root_path: path in which to find files 

1945 Returns: iterator over paths with unstaged changes 

1946 """ 

1947 # For each entry in the index check the sha1 & ensure not staged 

1948 if not isinstance(root_path, bytes): 

1949 root_path = os.fsencode(root_path) 

1950 

1951 for tree_path, entry in index.iteritems(): 

1952 full_path = _tree_to_fs_path(root_path, tree_path) 

1953 if isinstance(entry, ConflictedIndexEntry): 

1954 # Conflicted files are always unstaged 

1955 yield tree_path 

1956 continue 

1957 

1958 try: 

1959 st = os.lstat(full_path) 

1960 if stat.S_ISDIR(st.st_mode): 

1961 if _has_directory_changed(tree_path, entry): 

1962 yield tree_path 

1963 continue 

1964 

1965 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode): 

1966 continue 

1967 

1968 blob = blob_from_path_and_stat(full_path, st) 

1969 

1970 if filter_blob_callback is not None: 

1971 blob = filter_blob_callback(blob, tree_path) 

1972 except FileNotFoundError: 

1973 # The file was removed, so we assume that counts as 

1974 # different from whatever file used to exist. 

1975 yield tree_path 

1976 else: 

1977 if blob.id != entry.sha: 

1978 yield tree_path 

1979 

1980 

1981def _tree_to_fs_path(root_path: bytes, tree_path: bytes) -> bytes: 

1982 """Convert a git tree path to a file system path. 

1983 

1984 Args: 

1985 root_path: Root filesystem path 

1986 tree_path: Git tree path as bytes 

1987 

1988 Returns: File system path. 

1989 """ 

1990 assert isinstance(tree_path, bytes) 

1991 if os_sep_bytes != b"/": 

1992 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes) 

1993 else: 

1994 sep_corrected_path = tree_path 

1995 return os.path.join(root_path, sep_corrected_path) 

1996 

1997 

1998def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes: 

1999 """Convert a file system path to a git tree path. 

2000 

2001 Args: 

2002 fs_path: File system path. 

2003 

2004 Returns: Git tree path as bytes 

2005 """ 

2006 if not isinstance(fs_path, bytes): 

2007 fs_path_bytes = os.fsencode(fs_path) 

2008 else: 

2009 fs_path_bytes = fs_path 

2010 if os_sep_bytes != b"/": 

2011 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/") 

2012 else: 

2013 tree_path = fs_path_bytes 

2014 return tree_path 

2015 

2016 

2017def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]: 

2018 if os.path.exists(os.path.join(path, b".git")): 

2019 head = read_submodule_head(path) 

2020 if head is None: 

2021 return None 

2022 return index_entry_from_stat(st, head, mode=S_IFGITLINK) 

2023 return None 

2024 

2025 

2026def index_entry_from_path( 

2027 path: bytes, object_store: Optional[ObjectContainer] = None 

2028) -> Optional[IndexEntry]: 

2029 """Create an index from a filesystem path. 

2030 

2031 This returns an index value for files, symlinks 

2032 and tree references. for directories and 

2033 non-existent files it returns None 

2034 

2035 Args: 

2036 path: Path to create an index entry for 

2037 object_store: Optional object store to 

2038 save new blobs in 

2039 Returns: An index entry; None for directories 

2040 """ 

2041 assert isinstance(path, bytes) 

2042 st = os.lstat(path) 

2043 if stat.S_ISDIR(st.st_mode): 

2044 return index_entry_from_directory(st, path) 

2045 

2046 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode): 

2047 blob = blob_from_path_and_stat(path, st) 

2048 if object_store is not None: 

2049 object_store.add_object(blob) 

2050 return index_entry_from_stat(st, blob.id) 

2051 

2052 return None 

2053 

2054 

2055def iter_fresh_entries( 

2056 paths: Iterable[bytes], 

2057 root_path: bytes, 

2058 object_store: Optional[ObjectContainer] = None, 

2059) -> Iterator[tuple[bytes, Optional[IndexEntry]]]: 

2060 """Iterate over current versions of index entries on disk. 

2061 

2062 Args: 

2063 paths: Paths to iterate over 

2064 root_path: Root path to access from 

2065 object_store: Optional store to save new blobs in 

2066 Returns: Iterator over path, index_entry 

2067 """ 

2068 for path in paths: 

2069 p = _tree_to_fs_path(root_path, path) 

2070 try: 

2071 entry = index_entry_from_path(p, object_store=object_store) 

2072 except (FileNotFoundError, IsADirectoryError): 

2073 entry = None 

2074 yield path, entry 

2075 

2076 

2077def iter_fresh_objects( 

2078 paths: Iterable[bytes], 

2079 root_path: bytes, 

2080 include_deleted: bool = False, 

2081 object_store: Optional[ObjectContainer] = None, 

2082) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]: 

2083 """Iterate over versions of objects on disk referenced by index. 

2084 

2085 Args: 

2086 root_path: Root path to access from 

2087 include_deleted: Include deleted entries with sha and 

2088 mode set to None 

2089 object_store: Optional object store to report new items to 

2090 Returns: Iterator over path, sha, mode 

2091 """ 

2092 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store): 

2093 if entry is None: 

2094 if include_deleted: 

2095 yield path, None, None 

2096 else: 

2097 yield path, entry.sha, cleanup_mode(entry.mode) 

2098 

2099 

2100def refresh_index(index: Index, root_path: bytes) -> None: 

2101 """Refresh the contents of an index. 

2102 

2103 This is the equivalent to running 'git commit -a'. 

2104 

2105 Args: 

2106 index: Index to update 

2107 root_path: Root filesystem path 

2108 """ 

2109 for path, entry in iter_fresh_entries(index, root_path): 

2110 if entry: 

2111 index[path] = entry 

2112 

2113 

2114class locked_index: 

2115 """Lock the index while making modifications. 

2116 

2117 Works as a context manager. 

2118 """ 

2119 

2120 _file: "_GitFile" 

2121 

2122 def __init__(self, path: Union[bytes, str]) -> None: 

2123 self._path = path 

2124 

2125 def __enter__(self) -> Index: 

2126 self._file = GitFile(self._path, "wb") 

2127 self._index = Index(self._path) 

2128 return self._index 

2129 

2130 def __exit__( 

2131 self, 

2132 exc_type: Optional[type], 

2133 exc_value: Optional[BaseException], 

2134 traceback: Optional[types.TracebackType], 

2135 ) -> None: 

2136 if exc_type is not None: 

2137 self._file.abort() 

2138 return 

2139 try: 

2140 from typing import BinaryIO, cast 

2141 

2142 f = SHA1Writer(cast(BinaryIO, self._file)) 

2143 write_index_dict(cast(BinaryIO, f), self._index._byname) 

2144 except BaseException: 

2145 self._file.abort() 

2146 else: 

2147 f.close()