Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import (
32 Callable,
33 Generator,
34 Iterable,
35 Iterator,
36 Mapping,
37 Sequence,
38 Set,
39)
40from dataclasses import dataclass
41from enum import Enum
42from typing import (
43 IO,
44 TYPE_CHECKING,
45 Any,
46 BinaryIO,
47)
49if TYPE_CHECKING:
50 from .config import Config
51 from .diff_tree import TreeChange
52 from .file import _GitFile
53 from .filters import FilterBlobNormalizer
54 from .object_store import BaseObjectStore
55 from .repo import Repo
57from .file import GitFile
58from .object_store import iter_tree_contents
59from .objects import (
60 S_IFGITLINK,
61 S_ISGITLINK,
62 Blob,
63 ObjectID,
64 Tree,
65 TreeEntry,
66 hex_to_sha,
67 sha_to_hex,
68)
69from .pack import ObjectContainer, SHA1Reader, SHA1Writer
71# Type alias for recursive tree structure used in commit_tree
72TreeDict = dict[bytes, "TreeDict | tuple[int, bytes]"]
74# 2-bit stage (during merge)
75FLAG_STAGEMASK = 0x3000
76FLAG_STAGESHIFT = 12
77FLAG_NAMEMASK = 0x0FFF
79# assume-valid
80FLAG_VALID = 0x8000
82# extended flag (must be zero in version 2)
83FLAG_EXTENDED = 0x4000
85# used by sparse checkout
86EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
88# used by "git add -N"
89EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
91DEFAULT_VERSION = 2
93# Index extension signatures
94TREE_EXTENSION = b"TREE"
95REUC_EXTENSION = b"REUC"
96UNTR_EXTENSION = b"UNTR"
97EOIE_EXTENSION = b"EOIE"
98IEOT_EXTENSION = b"IEOT"
99SDIR_EXTENSION = b"sdir" # Sparse directory extension
102def _encode_varint(value: int) -> bytes:
103 """Encode an integer using variable-width encoding.
105 Same format as used for OFS_DELTA pack entries and index v4 path compression.
106 Uses 7 bits per byte, with the high bit indicating continuation.
108 Args:
109 value: Integer to encode
110 Returns:
111 Encoded bytes
112 """
113 if value == 0:
114 return b"\x00"
116 result = []
117 while value > 0:
118 byte = value & 0x7F # Take lower 7 bits
119 value >>= 7
120 if value > 0:
121 byte |= 0x80 # Set continuation bit
122 result.append(byte)
124 return bytes(result)
127def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
128 """Decode a variable-width encoded integer.
130 Args:
131 data: Bytes to decode from
132 offset: Starting offset in data
133 Returns:
134 tuple of (decoded_value, new_offset)
135 """
136 value = 0
137 shift = 0
138 pos = offset
140 while pos < len(data):
141 byte = data[pos]
142 pos += 1
143 value |= (byte & 0x7F) << shift
144 shift += 7
145 if not (byte & 0x80): # No continuation bit
146 break
148 return value, pos
151def _compress_path(path: bytes, previous_path: bytes) -> bytes:
152 """Compress a path relative to the previous path for index version 4.
154 Args:
155 path: Path to compress
156 previous_path: Previous path for comparison
157 Returns:
158 Compressed path data (varint prefix_len + suffix)
159 """
160 # Find the common prefix length
161 common_len = 0
162 min_len = min(len(path), len(previous_path))
164 for i in range(min_len):
165 if path[i] == previous_path[i]:
166 common_len += 1
167 else:
168 break
170 # The number of bytes to remove from the end of previous_path
171 # to get the common prefix
172 remove_len = len(previous_path) - common_len
174 # The suffix to append
175 suffix = path[common_len:]
177 # Encode: varint(remove_len) + suffix + NUL
178 return _encode_varint(remove_len) + suffix + b"\x00"
181def _decompress_path(
182 data: bytes, offset: int, previous_path: bytes
183) -> tuple[bytes, int]:
184 """Decompress a path from index version 4 compressed format.
186 Args:
187 data: Raw data containing compressed path
188 offset: Starting offset in data
189 previous_path: Previous path for decompression
190 Returns:
191 tuple of (decompressed_path, new_offset)
192 """
193 # Decode the number of bytes to remove from previous path
194 remove_len, new_offset = _decode_varint(data, offset)
196 # Find the NUL terminator for the suffix
197 suffix_start = new_offset
198 suffix_end = suffix_start
199 while suffix_end < len(data) and data[suffix_end] != 0:
200 suffix_end += 1
202 if suffix_end >= len(data):
203 raise ValueError("Unterminated path suffix in compressed entry")
205 suffix = data[suffix_start:suffix_end]
206 new_offset = suffix_end + 1 # Skip the NUL terminator
208 # Reconstruct the path
209 if remove_len > len(previous_path):
210 raise ValueError(
211 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
212 )
214 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
215 path = prefix + suffix
217 return path, new_offset
220def _decompress_path_from_stream(
221 f: BinaryIO, previous_path: bytes
222) -> tuple[bytes, int]:
223 """Decompress a path from index version 4 compressed format, reading from stream.
225 Args:
226 f: File-like object to read from
227 previous_path: Previous path for decompression
228 Returns:
229 tuple of (decompressed_path, bytes_consumed)
230 """
231 # Decode the varint for remove_len by reading byte by byte
232 remove_len = 0
233 shift = 0
234 bytes_consumed = 0
236 while True:
237 byte_data = f.read(1)
238 if not byte_data:
239 raise ValueError("Unexpected end of file while reading varint")
240 byte = byte_data[0]
241 bytes_consumed += 1
242 remove_len |= (byte & 0x7F) << shift
243 shift += 7
244 if not (byte & 0x80): # No continuation bit
245 break
247 # Read the suffix until NUL terminator
248 suffix = b""
249 while True:
250 byte_data = f.read(1)
251 if not byte_data:
252 raise ValueError("Unexpected end of file while reading path suffix")
253 byte = byte_data[0]
254 bytes_consumed += 1
255 if byte == 0: # NUL terminator
256 break
257 suffix += bytes([byte])
259 # Reconstruct the path
260 if remove_len > len(previous_path):
261 raise ValueError(
262 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
263 )
265 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
266 path = prefix + suffix
268 return path, bytes_consumed
271class Stage(Enum):
272 """Represents the stage of an index entry during merge conflicts."""
274 NORMAL = 0
275 MERGE_CONFLICT_ANCESTOR = 1
276 MERGE_CONFLICT_THIS = 2
277 MERGE_CONFLICT_OTHER = 3
280@dataclass
281class SerializedIndexEntry:
282 """Represents a serialized index entry as stored in the index file.
284 This dataclass holds the raw data for an index entry before it's
285 parsed into the more user-friendly IndexEntry format.
286 """
288 name: bytes
289 ctime: int | float | tuple[int, int]
290 mtime: int | float | tuple[int, int]
291 dev: int
292 ino: int
293 mode: int
294 uid: int
295 gid: int
296 size: int
297 sha: bytes
298 flags: int
299 extended_flags: int
301 def stage(self) -> Stage:
302 """Extract the stage from the flags field.
304 Returns:
305 Stage enum value indicating merge conflict state
306 """
307 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
309 def is_sparse_dir(self) -> bool:
310 """Check if this entry represents a sparse directory.
312 A sparse directory entry is a collapsed representation of an entire
313 directory tree in a sparse index. It has:
314 - Directory mode (0o040000)
315 - SKIP_WORKTREE flag set
316 - Path ending with '/'
317 - SHA pointing to a tree object
319 Returns:
320 True if entry is a sparse directory entry
321 """
322 return (
323 stat.S_ISDIR(self.mode)
324 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
325 and self.name.endswith(b"/")
326 )
329@dataclass
330class IndexExtension:
331 """Base class for index extensions."""
333 signature: bytes
334 data: bytes
336 @classmethod
337 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
338 """Create an extension from raw data.
340 Args:
341 signature: 4-byte extension signature
342 data: Extension data
343 Returns:
344 Parsed extension object
345 """
346 if signature == TREE_EXTENSION:
347 return TreeExtension.from_bytes(data)
348 elif signature == REUC_EXTENSION:
349 return ResolveUndoExtension.from_bytes(data)
350 elif signature == UNTR_EXTENSION:
351 return UntrackedExtension.from_bytes(data)
352 elif signature == SDIR_EXTENSION:
353 return SparseDirExtension.from_bytes(data)
354 else:
355 # Unknown extension - just store raw data
356 return cls(signature, data)
358 def to_bytes(self) -> bytes:
359 """Serialize extension to bytes."""
360 return self.data
363class TreeExtension(IndexExtension):
364 """Tree cache extension."""
366 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
367 """Initialize TreeExtension.
369 Args:
370 entries: List of tree cache entries (path, sha, flags)
371 """
372 self.entries = entries
373 super().__init__(TREE_EXTENSION, b"")
375 @classmethod
376 def from_bytes(cls, data: bytes) -> "TreeExtension":
377 """Parse TreeExtension from bytes.
379 Args:
380 data: Raw bytes to parse
382 Returns:
383 TreeExtension instance
384 """
385 # TODO: Implement tree cache parsing
386 return cls([])
388 def to_bytes(self) -> bytes:
389 """Serialize TreeExtension to bytes.
391 Returns:
392 Serialized extension data
393 """
394 # TODO: Implement tree cache serialization
395 return b""
398class ResolveUndoExtension(IndexExtension):
399 """Resolve undo extension for recording merge conflicts."""
401 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
402 """Initialize ResolveUndoExtension.
404 Args:
405 entries: List of (path, stages) where stages is a list of (stage, sha) tuples
406 """
407 self.entries = entries
408 super().__init__(REUC_EXTENSION, b"")
410 @classmethod
411 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
412 """Parse ResolveUndoExtension from bytes.
414 Args:
415 data: Raw bytes to parse
417 Returns:
418 ResolveUndoExtension instance
419 """
420 # TODO: Implement resolve undo parsing
421 return cls([])
423 def to_bytes(self) -> bytes:
424 """Serialize ResolveUndoExtension to bytes.
426 Returns:
427 Serialized extension data
428 """
429 # TODO: Implement resolve undo serialization
430 return b""
433class UntrackedExtension(IndexExtension):
434 """Untracked cache extension."""
436 def __init__(self, data: bytes) -> None:
437 """Initialize UntrackedExtension.
439 Args:
440 data: Raw untracked cache data
441 """
442 super().__init__(UNTR_EXTENSION, data)
444 @classmethod
445 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
446 """Parse UntrackedExtension from bytes.
448 Args:
449 data: Raw bytes to parse
451 Returns:
452 UntrackedExtension instance
453 """
454 return cls(data)
457class SparseDirExtension(IndexExtension):
458 """Sparse directory extension.
460 This extension indicates that the index contains sparse directory entries.
461 Tools that don't understand sparse index should avoid interacting with
462 the index when this extension is present.
464 The extension data is empty - its presence is the signal.
465 """
467 def __init__(self) -> None:
468 """Initialize SparseDirExtension."""
469 super().__init__(SDIR_EXTENSION, b"")
471 @classmethod
472 def from_bytes(cls, data: bytes) -> "SparseDirExtension":
473 """Parse SparseDirExtension from bytes.
475 Args:
476 data: Raw bytes to parse (should be empty)
478 Returns:
479 SparseDirExtension instance
480 """
481 return cls()
483 def to_bytes(self) -> bytes:
484 """Serialize SparseDirExtension to bytes.
486 Returns:
487 Empty bytes (extension presence is the signal)
488 """
489 return b""
492@dataclass
493class IndexEntry:
494 """Represents an entry in the Git index.
496 This is a higher-level representation of an index entry that includes
497 parsed data and convenience methods.
498 """
500 ctime: int | float | tuple[int, int]
501 mtime: int | float | tuple[int, int]
502 dev: int
503 ino: int
504 mode: int
505 uid: int
506 gid: int
507 size: int
508 sha: bytes
509 flags: int = 0
510 extended_flags: int = 0
512 @classmethod
513 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
514 """Create an IndexEntry from a SerializedIndexEntry.
516 Args:
517 serialized: SerializedIndexEntry to convert
519 Returns:
520 New IndexEntry instance
521 """
522 return cls(
523 ctime=serialized.ctime,
524 mtime=serialized.mtime,
525 dev=serialized.dev,
526 ino=serialized.ino,
527 mode=serialized.mode,
528 uid=serialized.uid,
529 gid=serialized.gid,
530 size=serialized.size,
531 sha=serialized.sha,
532 flags=serialized.flags,
533 extended_flags=serialized.extended_flags,
534 )
536 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
537 """Serialize this entry with a given name and stage.
539 Args:
540 name: Path name for the entry
541 stage: Merge conflict stage
543 Returns:
544 SerializedIndexEntry ready for writing to disk
545 """
546 # Clear out any existing stage bits, then set them from the Stage.
547 new_flags = self.flags & ~FLAG_STAGEMASK
548 new_flags |= stage.value << FLAG_STAGESHIFT
549 return SerializedIndexEntry(
550 name=name,
551 ctime=self.ctime,
552 mtime=self.mtime,
553 dev=self.dev,
554 ino=self.ino,
555 mode=self.mode,
556 uid=self.uid,
557 gid=self.gid,
558 size=self.size,
559 sha=self.sha,
560 flags=new_flags,
561 extended_flags=self.extended_flags,
562 )
564 def stage(self) -> Stage:
565 """Get the merge conflict stage of this entry.
567 Returns:
568 Stage enum value
569 """
570 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
572 @property
573 def skip_worktree(self) -> bool:
574 """Return True if the skip-worktree bit is set in extended_flags."""
575 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
577 def set_skip_worktree(self, skip: bool = True) -> None:
578 """Helper method to set or clear the skip-worktree bit in extended_flags.
580 Also sets FLAG_EXTENDED in self.flags if needed.
581 """
582 if skip:
583 # Turn on the skip-worktree bit
584 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
585 # Also ensure the main 'extended' bit is set in flags
586 self.flags |= FLAG_EXTENDED
587 else:
588 # Turn off the skip-worktree bit
589 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
590 # Optionally unset the main extended bit if no extended flags remain
591 if self.extended_flags == 0:
592 self.flags &= ~FLAG_EXTENDED
594 def is_sparse_dir(self, name: bytes) -> bool:
595 """Check if this entry represents a sparse directory.
597 A sparse directory entry is a collapsed representation of an entire
598 directory tree in a sparse index. It has:
599 - Directory mode (0o040000)
600 - SKIP_WORKTREE flag set
601 - Path ending with '/'
602 - SHA pointing to a tree object
604 Args:
605 name: The path name for this entry (IndexEntry doesn't store name)
607 Returns:
608 True if entry is a sparse directory entry
609 """
610 return (
611 stat.S_ISDIR(self.mode)
612 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
613 and name.endswith(b"/")
614 )
617class ConflictedIndexEntry:
618 """Index entry that represents a conflict."""
620 ancestor: IndexEntry | None
621 this: IndexEntry | None
622 other: IndexEntry | None
624 def __init__(
625 self,
626 ancestor: IndexEntry | None = None,
627 this: IndexEntry | None = None,
628 other: IndexEntry | None = None,
629 ) -> None:
630 """Initialize ConflictedIndexEntry.
632 Args:
633 ancestor: The common ancestor entry
634 this: The current branch entry
635 other: The other branch entry
636 """
637 self.ancestor = ancestor
638 self.this = this
639 self.other = other
642class UnmergedEntries(Exception):
643 """Unmerged entries exist in the index."""
646def pathsplit(path: bytes) -> tuple[bytes, bytes]:
647 """Split a /-delimited path into a directory part and a basename.
649 Args:
650 path: The path to split.
652 Returns:
653 Tuple with directory name and basename
654 """
655 try:
656 (dirname, basename) = path.rsplit(b"/", 1)
657 except ValueError:
658 return (b"", path)
659 else:
660 return (dirname, basename)
663def pathjoin(*args: bytes) -> bytes:
664 """Join a /-delimited path."""
665 return b"/".join([p for p in args if p])
668def read_cache_time(f: BinaryIO) -> tuple[int, int]:
669 """Read a cache time.
671 Args:
672 f: File-like object to read from
673 Returns:
674 Tuple with seconds and nanoseconds
675 """
676 return struct.unpack(">LL", f.read(8))
679def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None:
680 """Write a cache time.
682 Args:
683 f: File-like object to write to
684 t: Time to write (as int, float or tuple with secs and nsecs)
685 """
686 if isinstance(t, int):
687 t = (t, 0)
688 elif isinstance(t, float):
689 (secs, nsecs) = divmod(t, 1.0)
690 t = (int(secs), int(nsecs * 1000000000))
691 elif not isinstance(t, tuple):
692 raise TypeError(t)
693 f.write(struct.pack(">LL", *t))
696def read_cache_entry(
697 f: BinaryIO, version: int, previous_path: bytes = b""
698) -> SerializedIndexEntry:
699 """Read an entry from a cache file.
701 Args:
702 f: File-like object to read from
703 version: Index version
704 previous_path: Previous entry's path (for version 4 compression)
705 """
706 beginoffset = f.tell()
707 ctime = read_cache_time(f)
708 mtime = read_cache_time(f)
709 (
710 dev,
711 ino,
712 mode,
713 uid,
714 gid,
715 size,
716 sha,
717 flags,
718 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
719 if flags & FLAG_EXTENDED:
720 if version < 3:
721 raise AssertionError("extended flag set in index with version < 3")
722 (extended_flags,) = struct.unpack(">H", f.read(2))
723 else:
724 extended_flags = 0
726 if version >= 4:
727 # Version 4: paths are always compressed (name_len should be 0)
728 name, _consumed = _decompress_path_from_stream(f, previous_path)
729 else:
730 # Versions < 4: regular name reading
731 name = f.read(flags & FLAG_NAMEMASK)
733 # Padding:
734 if version < 4:
735 real_size = (f.tell() - beginoffset + 8) & ~7
736 f.read((beginoffset + real_size) - f.tell())
738 return SerializedIndexEntry(
739 name,
740 ctime,
741 mtime,
742 dev,
743 ino,
744 mode,
745 uid,
746 gid,
747 size,
748 sha_to_hex(sha),
749 flags & ~FLAG_NAMEMASK,
750 extended_flags,
751 )
754def write_cache_entry(
755 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
756) -> None:
757 """Write an index entry to a file.
759 Args:
760 f: File object
761 entry: IndexEntry to write
762 version: Index format version
763 previous_path: Previous entry's path (for version 4 compression)
764 """
765 beginoffset = f.tell()
766 write_cache_time(f, entry.ctime)
767 write_cache_time(f, entry.mtime)
769 if version >= 4:
770 # Version 4: use compression but set name_len to actual filename length
771 # This matches how C Git implements index v4 flags
772 compressed_path = _compress_path(entry.name, previous_path)
773 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
774 else:
775 # Versions < 4: include actual name length
776 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
778 if entry.extended_flags:
779 flags |= FLAG_EXTENDED
780 if flags & FLAG_EXTENDED and version is not None and version < 3:
781 raise AssertionError("unable to use extended flags in version < 3")
783 f.write(
784 struct.pack(
785 b">LLLLLL20sH",
786 entry.dev & 0xFFFFFFFF,
787 entry.ino & 0xFFFFFFFF,
788 entry.mode,
789 entry.uid,
790 entry.gid,
791 entry.size,
792 hex_to_sha(entry.sha),
793 flags,
794 )
795 )
796 if flags & FLAG_EXTENDED:
797 f.write(struct.pack(b">H", entry.extended_flags))
799 if version >= 4:
800 # Version 4: always write compressed path
801 f.write(compressed_path)
802 else:
803 # Versions < 4: write regular path and padding
804 f.write(entry.name)
805 real_size = (f.tell() - beginoffset + 8) & ~7
806 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
809class UnsupportedIndexFormat(Exception):
810 """An unsupported index format was encountered."""
812 def __init__(self, version: int) -> None:
813 """Initialize UnsupportedIndexFormat exception.
815 Args:
816 version: The unsupported index format version
817 """
818 self.index_format_version = version
821def read_index_header(f: BinaryIO) -> tuple[int, int]:
822 """Read an index header from a file.
824 Returns:
825 tuple of (version, num_entries)
826 """
827 header = f.read(4)
828 if header != b"DIRC":
829 raise AssertionError(f"Invalid index file header: {header!r}")
830 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
831 if version not in (1, 2, 3, 4):
832 raise UnsupportedIndexFormat(version)
833 return version, num_entries
836def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None:
837 """Write an index extension.
839 Args:
840 f: File-like object to write to
841 extension: Extension to write
842 """
843 data = extension.to_bytes()
844 f.write(extension.signature)
845 f.write(struct.pack(">I", len(data)))
846 f.write(data)
849def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
850 """Read an index file, yielding the individual entries."""
851 version, num_entries = read_index_header(f)
852 previous_path = b""
853 for i in range(num_entries):
854 entry = read_cache_entry(f, version, previous_path)
855 previous_path = entry.name
856 yield entry
859def read_index_dict_with_version(
860 f: BinaryIO,
861) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]:
862 """Read an index file and return it as a dictionary along with the version.
864 Returns:
865 tuple of (entries_dict, version, extensions)
866 """
867 version, num_entries = read_index_header(f)
869 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
870 previous_path = b""
871 for i in range(num_entries):
872 entry = read_cache_entry(f, version, previous_path)
873 previous_path = entry.name
874 stage = entry.stage()
875 if stage == Stage.NORMAL:
876 ret[entry.name] = IndexEntry.from_serialized(entry)
877 else:
878 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
879 if isinstance(existing, IndexEntry):
880 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
881 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
882 existing.ancestor = IndexEntry.from_serialized(entry)
883 elif stage == Stage.MERGE_CONFLICT_THIS:
884 existing.this = IndexEntry.from_serialized(entry)
885 elif stage == Stage.MERGE_CONFLICT_OTHER:
886 existing.other = IndexEntry.from_serialized(entry)
888 # Read extensions
889 extensions = []
890 while True:
891 # Check if we're at the end (20 bytes before EOF for SHA checksum)
892 current_pos = f.tell()
893 f.seek(0, 2) # EOF
894 eof_pos = f.tell()
895 f.seek(current_pos)
897 if current_pos >= eof_pos - 20:
898 break
900 # Try to read extension signature
901 signature = f.read(4)
902 if len(signature) < 4:
903 break
905 # Check if it's a valid extension signature (4 uppercase letters)
906 if not all(65 <= b <= 90 for b in signature):
907 # Not an extension, seek back
908 f.seek(-4, 1)
909 break
911 # Read extension size
912 size_data = f.read(4)
913 if len(size_data) < 4:
914 break
915 size = struct.unpack(">I", size_data)[0]
917 # Read extension data
918 data = f.read(size)
919 if len(data) < size:
920 break
922 extension = IndexExtension.from_raw(signature, data)
923 extensions.append(extension)
925 return ret, version, extensions
928def read_index_dict(
929 f: BinaryIO,
930) -> dict[bytes, IndexEntry | ConflictedIndexEntry]:
931 """Read an index file and return it as a dictionary.
933 Dict Key is tuple of path and stage number, as
934 path alone is not unique
935 Args:
936 f: File object to read fromls.
937 """
938 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
939 for entry in read_index(f):
940 stage = entry.stage()
941 if stage == Stage.NORMAL:
942 ret[entry.name] = IndexEntry.from_serialized(entry)
943 else:
944 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
945 if isinstance(existing, IndexEntry):
946 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
947 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
948 existing.ancestor = IndexEntry.from_serialized(entry)
949 elif stage == Stage.MERGE_CONFLICT_THIS:
950 existing.this = IndexEntry.from_serialized(entry)
951 elif stage == Stage.MERGE_CONFLICT_OTHER:
952 existing.other = IndexEntry.from_serialized(entry)
953 return ret
956def write_index(
957 f: IO[bytes],
958 entries: Sequence[SerializedIndexEntry],
959 version: int | None = None,
960 extensions: Sequence[IndexExtension] | None = None,
961) -> None:
962 """Write an index file.
964 Args:
965 f: File-like object to write to
966 version: Version number to write
967 entries: Iterable over the entries to write
968 extensions: Optional list of extensions to write
969 """
970 if version is None:
971 version = DEFAULT_VERSION
972 # STEP 1: check if any extended_flags are set
973 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
974 if uses_extended_flags and version < 3:
975 # Force or bump the version to 3
976 version = 3
977 # The rest is unchanged, but you might insert a final check:
978 if version < 3:
979 # Double-check no extended flags appear
980 for e in entries:
981 if e.extended_flags != 0:
982 raise AssertionError("Attempt to use extended flags in index < v3")
983 # Proceed with the existing code to write the header and entries.
984 f.write(b"DIRC")
985 f.write(struct.pack(b">LL", version, len(entries)))
986 previous_path = b""
987 for entry in entries:
988 write_cache_entry(f, entry, version=version, previous_path=previous_path)
989 previous_path = entry.name
991 # Write extensions
992 if extensions:
993 for extension in extensions:
994 write_index_extension(f, extension)
997def write_index_dict(
998 f: IO[bytes],
999 entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry],
1000 version: int | None = None,
1001 extensions: Sequence[IndexExtension] | None = None,
1002) -> None:
1003 """Write an index file based on the contents of a dictionary.
1005 being careful to sort by path and then by stage.
1006 """
1007 entries_list = []
1008 for key in sorted(entries):
1009 value = entries[key]
1010 if isinstance(value, ConflictedIndexEntry):
1011 if value.ancestor is not None:
1012 entries_list.append(
1013 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
1014 )
1015 if value.this is not None:
1016 entries_list.append(
1017 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
1018 )
1019 if value.other is not None:
1020 entries_list.append(
1021 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
1022 )
1023 else:
1024 entries_list.append(value.serialize(key, Stage.NORMAL))
1026 write_index(f, entries_list, version=version, extensions=extensions)
1029def cleanup_mode(mode: int) -> int:
1030 """Cleanup a mode value.
1032 This will return a mode that can be stored in a tree object.
1034 Args:
1035 mode: Mode to clean up.
1037 Returns:
1038 mode
1039 """
1040 if stat.S_ISLNK(mode):
1041 return stat.S_IFLNK
1042 elif stat.S_ISDIR(mode):
1043 return stat.S_IFDIR
1044 elif S_ISGITLINK(mode):
1045 return S_IFGITLINK
1046 ret = stat.S_IFREG | 0o644
1047 if mode & 0o100:
1048 ret |= 0o111
1049 return ret
1052class Index:
1053 """A Git Index file."""
1055 _byname: dict[bytes, IndexEntry | ConflictedIndexEntry]
1057 def __init__(
1058 self,
1059 filename: bytes | str | os.PathLike[str],
1060 read: bool = True,
1061 skip_hash: bool = False,
1062 version: int | None = None,
1063 *,
1064 file_mode: int | None = None,
1065 ) -> None:
1066 """Create an index object associated with the given filename.
1068 Args:
1069 filename: Path to the index file
1070 read: Whether to initialize the index from the given file, should it exist.
1071 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
1072 version: Index format version to use (None = auto-detect from file or use default)
1073 file_mode: Optional file permission mask for shared repository
1074 """
1075 self._filename = os.fspath(filename)
1076 # TODO(jelmer): Store the version returned by read_index
1077 self._version = version
1078 self._skip_hash = skip_hash
1079 self._file_mode = file_mode
1080 self._extensions: list[IndexExtension] = []
1081 self.clear()
1082 if read:
1083 self.read()
1085 @property
1086 def path(self) -> bytes | str:
1087 """Get the path to the index file.
1089 Returns:
1090 Path to the index file
1091 """
1092 return self._filename
1094 def __repr__(self) -> str:
1095 """Return string representation of Index."""
1096 return f"{self.__class__.__name__}({self._filename!r})"
1098 def write(self) -> None:
1099 """Write current contents of index to disk."""
1100 mask = self._file_mode if self._file_mode is not None else 0o644
1101 f = GitFile(self._filename, "wb", mask=mask)
1102 try:
1103 # Filter out extensions with no meaningful data
1104 meaningful_extensions = []
1105 for ext in self._extensions:
1106 # Skip extensions that have empty data
1107 ext_data = ext.to_bytes()
1108 if ext_data:
1109 meaningful_extensions.append(ext)
1111 if self._skip_hash:
1112 # When skipHash is enabled, write the index without computing SHA1
1113 write_index_dict(
1114 f,
1115 self._byname,
1116 version=self._version,
1117 extensions=meaningful_extensions,
1118 )
1119 # Write 20 zero bytes instead of SHA1
1120 f.write(b"\x00" * 20)
1121 f.close()
1122 else:
1123 sha1_writer = SHA1Writer(f)
1124 write_index_dict(
1125 sha1_writer,
1126 self._byname,
1127 version=self._version,
1128 extensions=meaningful_extensions,
1129 )
1130 sha1_writer.close()
1131 except:
1132 f.close()
1133 raise
1135 def read(self) -> None:
1136 """Read current contents of index from disk."""
1137 if not os.path.exists(self._filename):
1138 return
1139 f = GitFile(self._filename, "rb")
1140 try:
1141 sha1_reader = SHA1Reader(f)
1142 entries, version, extensions = read_index_dict_with_version(sha1_reader)
1143 self._version = version
1144 self._extensions = extensions
1145 self.update(entries)
1146 # Extensions have already been read by read_index_dict_with_version
1147 sha1_reader.check_sha(allow_empty=True)
1148 finally:
1149 f.close()
1151 def __len__(self) -> int:
1152 """Number of entries in this index file."""
1153 return len(self._byname)
1155 def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry:
1156 """Retrieve entry by relative path and stage.
1158 Returns: Either a IndexEntry or a ConflictedIndexEntry
1159 Raises KeyError: if the entry does not exist
1160 """
1161 return self._byname[key]
1163 def __iter__(self) -> Iterator[bytes]:
1164 """Iterate over the paths and stages in this index."""
1165 return iter(self._byname)
1167 def __contains__(self, key: bytes) -> bool:
1168 """Check if a path exists in the index."""
1169 return key in self._byname
1171 def get_sha1(self, path: bytes) -> bytes:
1172 """Return the (git object) SHA1 for the object at a path."""
1173 value = self[path]
1174 if isinstance(value, ConflictedIndexEntry):
1175 raise UnmergedEntries
1176 return value.sha
1178 def get_mode(self, path: bytes) -> int:
1179 """Return the POSIX file mode for the object at a path."""
1180 value = self[path]
1181 if isinstance(value, ConflictedIndexEntry):
1182 raise UnmergedEntries
1183 return value.mode
1185 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
1186 """Iterate over path, sha, mode tuples for use with commit_tree."""
1187 for path in self:
1188 entry = self[path]
1189 if isinstance(entry, ConflictedIndexEntry):
1190 raise UnmergedEntries
1191 yield path, entry.sha, cleanup_mode(entry.mode)
1193 def has_conflicts(self) -> bool:
1194 """Check if the index contains any conflicted entries.
1196 Returns:
1197 True if any entries are conflicted, False otherwise
1198 """
1199 for value in self._byname.values():
1200 if isinstance(value, ConflictedIndexEntry):
1201 return True
1202 return False
1204 def clear(self) -> None:
1205 """Remove all contents from this index."""
1206 self._byname = {}
1208 def __setitem__(
1209 self, name: bytes, value: IndexEntry | ConflictedIndexEntry
1210 ) -> None:
1211 """Set an entry in the index."""
1212 assert isinstance(name, bytes)
1213 self._byname[name] = value
1215 def __delitem__(self, name: bytes) -> None:
1216 """Delete an entry from the index."""
1217 del self._byname[name]
1219 def iteritems(
1220 self,
1221 ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
1222 """Iterate over (path, entry) pairs in the index.
1224 Returns:
1225 Iterator of (path, entry) tuples
1226 """
1227 return iter(self._byname.items())
1229 def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
1230 """Get an iterator over (path, entry) pairs.
1232 Returns:
1233 Iterator of (path, entry) tuples
1234 """
1235 return iter(self._byname.items())
1237 def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None:
1238 """Update the index with multiple entries.
1240 Args:
1241 entries: Dictionary mapping paths to index entries
1242 """
1243 for key, value in entries.items():
1244 self[key] = value
1246 def paths(self) -> Generator[bytes, None, None]:
1247 """Generate all paths in the index.
1249 Yields:
1250 Path names as bytes
1251 """
1252 yield from self._byname.keys()
1254 def changes_from_tree(
1255 self,
1256 object_store: ObjectContainer,
1257 tree: ObjectID,
1258 want_unchanged: bool = False,
1259 ) -> Generator[
1260 tuple[
1261 tuple[bytes | None, bytes | None],
1262 tuple[int | None, int | None],
1263 tuple[bytes | None, bytes | None],
1264 ],
1265 None,
1266 None,
1267 ]:
1268 """Find the differences between the contents of this index and a tree.
1270 Args:
1271 object_store: Object store to use for retrieving tree contents
1272 tree: SHA1 of the root tree
1273 want_unchanged: Whether unchanged files should be reported
1274 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1275 newmode), (oldsha, newsha)
1276 """
1278 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1279 entry = self[path]
1280 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1281 return entry.sha, cleanup_mode(entry.mode)
1282 else:
1283 # Handle ConflictedIndexEntry case
1284 return b"", 0
1286 yield from changes_from_tree(
1287 self.paths(),
1288 lookup_entry,
1289 object_store,
1290 tree,
1291 want_unchanged=want_unchanged,
1292 )
1294 def commit(self, object_store: ObjectContainer) -> bytes:
1295 """Create a new tree from an index.
1297 Args:
1298 object_store: Object store to save the tree in
1299 Returns:
1300 Root tree SHA
1301 """
1302 return commit_tree(object_store, self.iterobjects())
1304 def is_sparse(self) -> bool:
1305 """Check if this index contains sparse directory entries.
1307 Returns:
1308 True if any sparse directory extension is present
1309 """
1310 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions)
1312 def ensure_full_index(self, object_store: "BaseObjectStore") -> None:
1313 """Expand all sparse directory entries into full file entries.
1315 This converts a sparse index into a full index by recursively
1316 expanding any sparse directory entries into their constituent files.
1318 Args:
1319 object_store: Object store to read tree objects from
1321 Raises:
1322 KeyError: If a tree object referenced by a sparse dir entry doesn't exist
1323 """
1324 if not self.is_sparse():
1325 return
1327 # Find all sparse directory entries
1328 sparse_dirs = []
1329 for path, entry in list(self._byname.items()):
1330 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path):
1331 sparse_dirs.append((path, entry))
1333 # Expand each sparse directory
1334 for path, entry in sparse_dirs:
1335 # Remove the sparse directory entry
1336 del self._byname[path]
1338 # Get the tree object
1339 tree = object_store[entry.sha]
1340 if not isinstance(tree, Tree):
1341 raise ValueError(f"Sparse directory {path!r} points to non-tree object")
1343 # Recursively add all entries from the tree
1344 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry)
1346 # Remove the sparse directory extension
1347 self._extensions = [
1348 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension)
1349 ]
1351 def _expand_tree(
1352 self,
1353 prefix: bytes,
1354 tree: Tree,
1355 object_store: "BaseObjectStore",
1356 template_entry: IndexEntry,
1357 ) -> None:
1358 """Recursively expand a tree into index entries.
1360 Args:
1361 prefix: Path prefix for entries (without trailing slash)
1362 tree: Tree object to expand
1363 object_store: Object store to read nested trees from
1364 template_entry: Template entry to copy metadata from
1365 """
1366 for name, mode, sha in tree.items():
1367 if prefix:
1368 full_path = prefix + b"/" + name
1369 else:
1370 full_path = name
1372 if stat.S_ISDIR(mode):
1373 # Recursively expand subdirectories
1374 subtree = object_store[sha]
1375 if not isinstance(subtree, Tree):
1376 raise ValueError(
1377 f"Directory entry {full_path!r} points to non-tree object"
1378 )
1379 self._expand_tree(full_path, subtree, object_store, template_entry)
1380 else:
1381 # Create an index entry for this file
1382 # Use the template entry for metadata but with the file's sha and mode
1383 new_entry = IndexEntry(
1384 ctime=template_entry.ctime,
1385 mtime=template_entry.mtime,
1386 dev=template_entry.dev,
1387 ino=template_entry.ino,
1388 mode=mode,
1389 uid=template_entry.uid,
1390 gid=template_entry.gid,
1391 size=0, # Size is unknown from tree
1392 sha=sha,
1393 flags=0,
1394 extended_flags=0, # Don't copy skip-worktree flag
1395 )
1396 self._byname[full_path] = new_entry
1398 def convert_to_sparse(
1399 self,
1400 object_store: "BaseObjectStore",
1401 tree_sha: bytes,
1402 sparse_dirs: Set[bytes],
1403 ) -> None:
1404 """Convert full index entries to sparse directory entries.
1406 This collapses directories that are entirely outside the sparse
1407 checkout cone into single sparse directory entries.
1409 Args:
1410 object_store: Object store to read tree objects
1411 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on
1412 sparse_dirs: Set of directory paths (with trailing /) to collapse
1414 Raises:
1415 KeyError: If tree_sha or a subdirectory doesn't exist
1416 """
1417 if not sparse_dirs:
1418 return
1420 # Get the base tree
1421 tree = object_store[tree_sha]
1422 if not isinstance(tree, Tree):
1423 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object")
1425 # For each sparse directory, find its tree SHA and create sparse entry
1426 for dir_path in sparse_dirs:
1427 dir_path_stripped = dir_path.rstrip(b"/")
1429 # Find the tree SHA for this directory
1430 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store)
1431 if subtree_sha is None:
1432 # Directory doesn't exist in tree, skip it
1433 continue
1435 # Remove all entries under this directory
1436 entries_to_remove = [
1437 path
1438 for path in self._byname
1439 if path.startswith(dir_path) or path == dir_path_stripped
1440 ]
1441 for path in entries_to_remove:
1442 del self._byname[path]
1444 # Create a sparse directory entry
1445 # Use minimal metadata since it's not a real file
1446 sparse_entry = IndexEntry(
1447 ctime=0,
1448 mtime=0,
1449 dev=0,
1450 ino=0,
1451 mode=stat.S_IFDIR,
1452 uid=0,
1453 gid=0,
1454 size=0,
1455 sha=subtree_sha,
1456 flags=0,
1457 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
1458 )
1459 self._byname[dir_path] = sparse_entry
1461 # Add sparse directory extension if not present
1462 if not self.is_sparse():
1463 self._extensions.append(SparseDirExtension())
1465 def _find_subtree_sha(
1466 self,
1467 tree: Tree,
1468 path: bytes,
1469 object_store: "BaseObjectStore",
1470 ) -> bytes | None:
1471 """Find the SHA of a subtree at a given path.
1473 Args:
1474 tree: Root tree object to search in
1475 path: Path to the subdirectory (no trailing slash)
1476 object_store: Object store to read nested trees from
1478 Returns:
1479 SHA of the subtree, or None if path doesn't exist
1480 """
1481 if not path:
1482 return tree.id
1484 parts = path.split(b"/")
1485 current_tree = tree
1487 for part in parts:
1488 # Look for this part in the current tree
1489 try:
1490 mode, sha = current_tree[part]
1491 except KeyError:
1492 return None
1494 if not stat.S_ISDIR(mode):
1495 # Path component is a file, not a directory
1496 return None
1498 # Load the next tree
1499 obj = object_store[sha]
1500 if not isinstance(obj, Tree):
1501 return None
1502 current_tree = obj
1504 return current_tree.id
1507def commit_tree(
1508 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1509) -> bytes:
1510 """Commit a new tree.
1512 Args:
1513 object_store: Object store to add trees to
1514 blobs: Iterable over blob path, sha, mode entries
1515 Returns:
1516 SHA1 of the created tree.
1517 """
1518 trees: dict[bytes, TreeDict] = {b"": {}}
1520 def add_tree(path: bytes) -> TreeDict:
1521 if path in trees:
1522 return trees[path]
1523 dirname, basename = pathsplit(path)
1524 t = add_tree(dirname)
1525 assert isinstance(basename, bytes)
1526 newtree: TreeDict = {}
1527 t[basename] = newtree
1528 trees[path] = newtree
1529 return newtree
1531 for path, sha, mode in blobs:
1532 tree_path, basename = pathsplit(path)
1533 tree = add_tree(tree_path)
1534 tree[basename] = (mode, sha)
1536 def build_tree(path: bytes) -> bytes:
1537 tree = Tree()
1538 for basename, entry in trees[path].items():
1539 if isinstance(entry, dict):
1540 mode = stat.S_IFDIR
1541 sha = build_tree(pathjoin(path, basename))
1542 else:
1543 (mode, sha) = entry
1544 tree.add(basename, mode, sha)
1545 object_store.add_object(tree)
1546 return tree.id
1548 return build_tree(b"")
1551def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1552 """Create a new tree from an index.
1554 Args:
1555 object_store: Object store to save the tree in
1556 index: Index file
1557 Note: This function is deprecated, use index.commit() instead.
1558 Returns: Root tree sha.
1559 """
1560 return commit_tree(object_store, index.iterobjects())
1563def changes_from_tree(
1564 names: Iterable[bytes],
1565 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1566 object_store: ObjectContainer,
1567 tree: bytes | None,
1568 want_unchanged: bool = False,
1569) -> Iterable[
1570 tuple[
1571 tuple[bytes | None, bytes | None],
1572 tuple[int | None, int | None],
1573 tuple[bytes | None, bytes | None],
1574 ]
1575]:
1576 """Find the differences between the contents of a tree and a working copy.
1578 Args:
1579 names: Iterable of names in the working copy
1580 lookup_entry: Function to lookup an entry in the working copy
1581 object_store: Object store to use for retrieving tree contents
1582 tree: SHA1 of the root tree, or None for an empty tree
1583 want_unchanged: Whether unchanged files should be reported
1584 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1585 (oldsha, newsha)
1586 """
1587 # TODO(jelmer): Support a include_trees option
1588 other_names = set(names)
1590 if tree is not None:
1591 for name, mode, sha in iter_tree_contents(object_store, tree):
1592 assert name is not None and mode is not None and sha is not None
1593 try:
1594 (other_sha, other_mode) = lookup_entry(name)
1595 except KeyError:
1596 # Was removed
1597 yield ((name, None), (mode, None), (sha, None))
1598 else:
1599 other_names.remove(name)
1600 if want_unchanged or other_sha != sha or other_mode != mode:
1601 yield ((name, name), (mode, other_mode), (sha, other_sha))
1603 # Mention added files
1604 for name in other_names:
1605 try:
1606 (other_sha, other_mode) = lookup_entry(name)
1607 except KeyError:
1608 pass
1609 else:
1610 yield ((None, name), (None, other_mode), (None, other_sha))
1613def index_entry_from_stat(
1614 stat_val: os.stat_result,
1615 hex_sha: bytes,
1616 mode: int | None = None,
1617) -> IndexEntry:
1618 """Create a new index entry from a stat value.
1620 Args:
1621 stat_val: POSIX stat_result instance
1622 hex_sha: Hex sha of the object
1623 mode: Optional file mode, will be derived from stat if not provided
1624 """
1625 if mode is None:
1626 mode = cleanup_mode(stat_val.st_mode)
1628 return IndexEntry(
1629 ctime=stat_val.st_ctime,
1630 mtime=stat_val.st_mtime,
1631 dev=stat_val.st_dev,
1632 ino=stat_val.st_ino,
1633 mode=mode,
1634 uid=stat_val.st_uid,
1635 gid=stat_val.st_gid,
1636 size=stat_val.st_size,
1637 sha=hex_sha,
1638 flags=0,
1639 extended_flags=0,
1640 )
1643if sys.platform == "win32":
1644 # On Windows, creating symlinks either requires administrator privileges
1645 # or developer mode. Raise a more helpful error when we're unable to
1646 # create symlinks
1648 # https://github.com/jelmer/dulwich/issues/1005
1650 class WindowsSymlinkPermissionError(PermissionError):
1651 """Windows-specific error for symlink creation failures.
1653 This error is raised when symlink creation fails on Windows,
1654 typically due to lack of developer mode or administrator privileges.
1655 """
1657 def __init__(self, errno: int, msg: str, filename: str | None) -> None:
1658 """Initialize WindowsSymlinkPermissionError."""
1659 super().__init__(
1660 errno,
1661 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1662 filename,
1663 )
1665 def symlink(
1666 src: str | bytes,
1667 dst: str | bytes,
1668 target_is_directory: bool = False,
1669 *,
1670 dir_fd: int | None = None,
1671 ) -> None:
1672 """Create a symbolic link on Windows with better error handling.
1674 Args:
1675 src: Source path for the symlink
1676 dst: Destination path where symlink will be created
1677 target_is_directory: Whether the target is a directory
1678 dir_fd: Optional directory file descriptor
1680 Raises:
1681 WindowsSymlinkPermissionError: If symlink creation fails due to permissions
1682 """
1683 try:
1684 return os.symlink(
1685 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1686 )
1687 except PermissionError as e:
1688 raise WindowsSymlinkPermissionError(
1689 e.errno or 0, e.strerror or "", e.filename
1690 ) from e
1691else:
1692 symlink = os.symlink
1695def build_file_from_blob(
1696 blob: Blob,
1697 mode: int,
1698 target_path: bytes,
1699 *,
1700 honor_filemode: bool = True,
1701 tree_encoding: str = "utf-8",
1702 symlink_fn: Callable[
1703 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
1704 ]
1705 | None = None,
1706) -> os.stat_result:
1707 """Build a file or symlink on disk based on a Git object.
1709 Args:
1710 blob: The git object
1711 mode: File mode
1712 target_path: Path to write to
1713 honor_filemode: An optional flag to honor core.filemode setting in
1714 config file, default is core.filemode=True, change executable bit
1715 tree_encoding: Encoding to use for tree contents
1716 symlink_fn: Function to use for creating symlinks
1717 Returns: stat object for the file
1718 """
1719 try:
1720 oldstat = os.lstat(target_path)
1721 except FileNotFoundError:
1722 oldstat = None
1723 contents = blob.as_raw_string()
1724 if stat.S_ISLNK(mode):
1725 if oldstat:
1726 _remove_file_with_readonly_handling(target_path)
1727 if sys.platform == "win32":
1728 # os.readlink on Python3 on Windows requires a unicode string.
1729 contents_str = contents.decode(tree_encoding)
1730 target_path_str = target_path.decode(tree_encoding)
1731 (symlink_fn or symlink)(contents_str, target_path_str)
1732 else:
1733 (symlink_fn or symlink)(contents, target_path)
1734 else:
1735 if oldstat is not None and oldstat.st_size == len(contents):
1736 with open(target_path, "rb") as f:
1737 if f.read() == contents:
1738 return oldstat
1740 with open(target_path, "wb") as f:
1741 # Write out file
1742 f.write(contents)
1744 if honor_filemode:
1745 os.chmod(target_path, mode)
1747 return os.lstat(target_path)
1750INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1753def _normalize_path_element_default(element: bytes) -> bytes:
1754 """Normalize path element for default case-insensitive comparison."""
1755 return element.lower()
1758def _normalize_path_element_ntfs(element: bytes) -> bytes:
1759 """Normalize path element for NTFS filesystem."""
1760 return element.rstrip(b". ").lower()
1763def _normalize_path_element_hfs(element: bytes) -> bytes:
1764 """Normalize path element for HFS+ filesystem."""
1765 import unicodedata
1767 # Decode to Unicode (let UnicodeDecodeError bubble up)
1768 element_str = element.decode("utf-8", errors="strict")
1770 # Remove HFS+ ignorable characters
1771 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1772 # Normalize to NFD
1773 normalized = unicodedata.normalize("NFD", filtered)
1774 return normalized.lower().encode("utf-8", errors="strict")
1777def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
1778 """Get the appropriate path element normalization function based on config.
1780 Args:
1781 config: Repository configuration object
1783 Returns:
1784 Function that normalizes path elements for the configured filesystem
1785 """
1786 import os
1787 import sys
1789 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1790 return _normalize_path_element_ntfs
1791 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1792 return _normalize_path_element_hfs
1793 else:
1794 return _normalize_path_element_default
1797def validate_path_element_default(element: bytes) -> bool:
1798 """Validate a path element using default rules.
1800 Args:
1801 element: Path element to validate
1803 Returns:
1804 True if path element is valid, False otherwise
1805 """
1806 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1809def validate_path_element_ntfs(element: bytes) -> bool:
1810 """Validate a path element using NTFS filesystem rules.
1812 Args:
1813 element: Path element to validate
1815 Returns:
1816 True if path element is valid for NTFS, False otherwise
1817 """
1818 normalized = _normalize_path_element_ntfs(element)
1819 if normalized in INVALID_DOTNAMES:
1820 return False
1821 if normalized == b"git~1":
1822 return False
1823 return True
1826# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1827HFS_IGNORABLE_CHARS = {
1828 0x200C, # ZERO WIDTH NON-JOINER
1829 0x200D, # ZERO WIDTH JOINER
1830 0x200E, # LEFT-TO-RIGHT MARK
1831 0x200F, # RIGHT-TO-LEFT MARK
1832 0x202A, # LEFT-TO-RIGHT EMBEDDING
1833 0x202B, # RIGHT-TO-LEFT EMBEDDING
1834 0x202C, # POP DIRECTIONAL FORMATTING
1835 0x202D, # LEFT-TO-RIGHT OVERRIDE
1836 0x202E, # RIGHT-TO-LEFT OVERRIDE
1837 0x206A, # INHIBIT SYMMETRIC SWAPPING
1838 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1839 0x206C, # INHIBIT ARABIC FORM SHAPING
1840 0x206D, # ACTIVATE ARABIC FORM SHAPING
1841 0x206E, # NATIONAL DIGIT SHAPES
1842 0x206F, # NOMINAL DIGIT SHAPES
1843 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1844}
1847def validate_path_element_hfs(element: bytes) -> bool:
1848 """Validate path element for HFS+ filesystem.
1850 Equivalent to Git's is_hfs_dotgit and related checks.
1851 Uses NFD normalization and ignores HFS+ ignorable characters.
1852 """
1853 try:
1854 normalized = _normalize_path_element_hfs(element)
1855 except UnicodeDecodeError:
1856 # Malformed UTF-8 - be conservative and reject
1857 return False
1859 # Check against invalid names
1860 if normalized in INVALID_DOTNAMES:
1861 return False
1863 # Also check for 8.3 short name
1864 if normalized == b"git~1":
1865 return False
1867 return True
1870def validate_path(
1871 path: bytes,
1872 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1873) -> bool:
1874 """Default path validator that just checks for .git/."""
1875 parts = path.split(b"/")
1876 for p in parts:
1877 if not element_validator(p):
1878 return False
1879 else:
1880 return True
1883def build_index_from_tree(
1884 root_path: str | bytes,
1885 index_path: str | bytes,
1886 object_store: ObjectContainer,
1887 tree_id: bytes,
1888 honor_filemode: bool = True,
1889 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1890 symlink_fn: Callable[
1891 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
1892 ]
1893 | None = None,
1894 blob_normalizer: "FilterBlobNormalizer | None" = None,
1895 tree_encoding: str = "utf-8",
1896) -> None:
1897 """Generate and materialize index from a tree.
1899 Args:
1900 tree_id: Tree to materialize
1901 root_path: Target dir for materialized index files
1902 index_path: Target path for generated index
1903 object_store: Non-empty object store holding tree contents
1904 honor_filemode: An optional flag to honor core.filemode setting in
1905 config file, default is core.filemode=True, change executable bit
1906 validate_path_element: Function to validate path elements to check
1907 out; default just refuses .git and .. directories.
1908 symlink_fn: Function to use for creating symlinks
1909 blob_normalizer: An optional BlobNormalizer to use for converting line
1910 endings when writing blobs to the working directory.
1911 tree_encoding: Encoding used for tree paths (default: utf-8)
1913 Note: existing index is wiped and contents are not merged
1914 in a working dir. Suitable only for fresh clones.
1915 """
1916 index = Index(index_path, read=False)
1917 if not isinstance(root_path, bytes):
1918 root_path = os.fsencode(root_path)
1920 for entry in iter_tree_contents(object_store, tree_id):
1921 assert (
1922 entry.path is not None and entry.mode is not None and entry.sha is not None
1923 )
1924 if not validate_path(entry.path, validate_path_element):
1925 continue
1926 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
1928 if not os.path.exists(os.path.dirname(full_path)):
1929 os.makedirs(os.path.dirname(full_path))
1931 # TODO(jelmer): Merge new index into working tree
1932 if S_ISGITLINK(entry.mode):
1933 if not os.path.isdir(full_path):
1934 os.mkdir(full_path)
1935 st = os.lstat(full_path)
1936 # TODO(jelmer): record and return submodule paths
1937 else:
1938 obj = object_store[entry.sha]
1939 assert isinstance(obj, Blob)
1940 # Apply blob normalization for checkout if normalizer is provided
1941 if blob_normalizer is not None:
1942 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1943 st = build_file_from_blob(
1944 obj,
1945 entry.mode,
1946 full_path,
1947 honor_filemode=honor_filemode,
1948 tree_encoding=tree_encoding,
1949 symlink_fn=symlink_fn,
1950 )
1952 # Add file to index
1953 if not honor_filemode or S_ISGITLINK(entry.mode):
1954 # we can not use tuple slicing to build a new tuple,
1955 # because on windows that will convert the times to
1956 # longs, which causes errors further along
1957 st_tuple = (
1958 entry.mode,
1959 st.st_ino,
1960 st.st_dev,
1961 st.st_nlink,
1962 st.st_uid,
1963 st.st_gid,
1964 st.st_size,
1965 st.st_atime,
1966 st.st_mtime,
1967 st.st_ctime,
1968 )
1969 st = st.__class__(st_tuple)
1970 # default to a stage 0 index entry (normal)
1971 # when reading from the filesystem
1972 index[entry.path] = index_entry_from_stat(st, entry.sha)
1974 index.write()
1977def blob_from_path_and_mode(
1978 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1979) -> Blob:
1980 """Create a blob from a path and a stat object.
1982 Args:
1983 fs_path: Full file system path to file
1984 mode: File mode
1985 tree_encoding: Encoding to use for tree contents
1986 Returns: A `Blob` object
1987 """
1988 assert isinstance(fs_path, bytes)
1989 blob = Blob()
1990 if stat.S_ISLNK(mode):
1991 if sys.platform == "win32":
1992 # os.readlink on Python3 on Windows requires a unicode string.
1993 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1994 else:
1995 blob.data = os.readlink(fs_path)
1996 else:
1997 with open(fs_path, "rb") as f:
1998 blob.data = f.read()
1999 return blob
2002def blob_from_path_and_stat(
2003 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
2004) -> Blob:
2005 """Create a blob from a path and a stat object.
2007 Args:
2008 fs_path: Full file system path to file
2009 st: A stat object
2010 tree_encoding: Encoding to use for tree contents
2011 Returns: A `Blob` object
2012 """
2013 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
2016def read_submodule_head(path: str | bytes) -> bytes | None:
2017 """Read the head commit of a submodule.
2019 Args:
2020 path: path to the submodule
2021 Returns: HEAD sha, None if not a valid head/repository
2022 """
2023 from .errors import NotGitRepository
2024 from .repo import Repo
2026 # Repo currently expects a "str", so decode if necessary.
2027 # TODO(jelmer): Perhaps move this into Repo() ?
2028 if not isinstance(path, str):
2029 path = os.fsdecode(path)
2030 try:
2031 repo = Repo(path)
2032 except NotGitRepository:
2033 return None
2034 try:
2035 return repo.head()
2036 except KeyError:
2037 return None
2040def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
2041 """Check if a directory has changed after getting an error.
2043 When handling an error trying to create a blob from a path, call this
2044 function. It will check if the path is a directory. If it's a directory
2045 and a submodule, check the submodule head to see if it's has changed. If
2046 not, consider the file as changed as Git tracked a file and not a
2047 directory.
2049 Return true if the given path should be considered as changed and False
2050 otherwise or if the path is not a directory.
2051 """
2052 # This is actually a directory
2053 if os.path.exists(os.path.join(tree_path, b".git")):
2054 # Submodule
2055 head = read_submodule_head(tree_path)
2056 if entry.sha != head:
2057 return True
2058 else:
2059 # The file was changed to a directory, so consider it removed.
2060 return True
2062 return False
2065os_sep_bytes = os.sep.encode("ascii")
2068def _ensure_parent_dir_exists(full_path: bytes) -> None:
2069 """Ensure parent directory exists, checking no parent is a file."""
2070 parent_dir = os.path.dirname(full_path)
2071 if parent_dir and not os.path.exists(parent_dir):
2072 # Walk up the directory tree to find the first existing parent
2073 current = parent_dir
2074 parents_to_check: list[bytes] = []
2076 while current and not os.path.exists(current):
2077 parents_to_check.insert(0, current)
2078 new_parent = os.path.dirname(current)
2079 if new_parent == current:
2080 # Reached the root or can't go up further
2081 break
2082 current = new_parent
2084 # Check if the existing parent (if any) is a directory
2085 if current and os.path.exists(current) and not os.path.isdir(current):
2086 raise OSError(
2087 f"Cannot create directory, parent path is a file: {current!r}"
2088 )
2090 # Now check each parent we need to create isn't blocked by an existing file
2091 for parent_path in parents_to_check:
2092 if os.path.exists(parent_path) and not os.path.isdir(parent_path):
2093 raise OSError(
2094 f"Cannot create directory, parent path is a file: {parent_path!r}"
2095 )
2097 os.makedirs(parent_dir)
2100def _remove_file_with_readonly_handling(path: bytes) -> None:
2101 """Remove a file, handling read-only files on Windows.
2103 Args:
2104 path: Path to the file to remove
2105 """
2106 try:
2107 os.unlink(path)
2108 except PermissionError:
2109 # On Windows, remove read-only attribute and retry
2110 if sys.platform == "win32":
2111 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
2112 os.unlink(path)
2113 else:
2114 raise
2117def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
2118 """Remove empty parent directories up to stop_at."""
2119 parent = os.path.dirname(path)
2120 while parent and parent != stop_at:
2121 try:
2122 os.rmdir(parent)
2123 parent = os.path.dirname(parent)
2124 except FileNotFoundError:
2125 # Directory doesn't exist - stop trying
2126 break
2127 except OSError as e:
2128 if e.errno in (errno.ENOTEMPTY, errno.EEXIST):
2129 # Directory not empty - stop trying
2130 break
2131 raise
2134def _check_symlink_matches(
2135 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes
2136) -> bool:
2137 """Check if symlink target matches expected target.
2139 Returns True if symlink matches, False if it doesn't match.
2140 """
2141 try:
2142 current_target = os.readlink(full_path)
2143 blob_obj = repo_object_store[entry_sha]
2144 expected_target = blob_obj.as_raw_string()
2145 if isinstance(current_target, str):
2146 current_target = current_target.encode()
2147 return current_target == expected_target
2148 except FileNotFoundError:
2149 # Symlink doesn't exist
2150 return False
2151 except OSError as e:
2152 if e.errno == errno.EINVAL:
2153 # Not a symlink
2154 return False
2155 raise
2158def _check_file_matches(
2159 repo_object_store: "BaseObjectStore",
2160 full_path: bytes,
2161 entry_sha: bytes,
2162 entry_mode: int,
2163 current_stat: os.stat_result,
2164 honor_filemode: bool,
2165 blob_normalizer: "FilterBlobNormalizer | None" = None,
2166 tree_path: bytes | None = None,
2167) -> bool:
2168 """Check if a file on disk matches the expected git object.
2170 Returns True if file matches, False if it doesn't match.
2171 """
2172 # Check mode first (if honor_filemode is True)
2173 if honor_filemode:
2174 current_mode = stat.S_IMODE(current_stat.st_mode)
2175 expected_mode = stat.S_IMODE(entry_mode)
2177 # For regular files, only check the user executable bit, not group/other permissions
2178 # This matches Git's behavior where umask differences don't count as modifications
2179 if stat.S_ISREG(current_stat.st_mode):
2180 # Normalize regular file modes to ignore group/other write permissions
2181 current_mode_normalized = (
2182 current_mode & 0o755
2183 ) # Keep only user rwx and all read+execute
2184 expected_mode_normalized = expected_mode & 0o755
2186 # For Git compatibility, regular files should be either 644 or 755
2187 if expected_mode_normalized not in (0o644, 0o755):
2188 expected_mode_normalized = 0o644 # Default for regular files
2189 if current_mode_normalized not in (0o644, 0o755):
2190 # Determine if it should be executable based on user execute bit
2191 if current_mode & 0o100: # User execute bit is set
2192 current_mode_normalized = 0o755
2193 else:
2194 current_mode_normalized = 0o644
2196 if current_mode_normalized != expected_mode_normalized:
2197 return False
2198 else:
2199 # For non-regular files (symlinks, etc.), check mode exactly
2200 if current_mode != expected_mode:
2201 return False
2203 # If mode matches (or we don't care), check content via size first
2204 blob_obj = repo_object_store[entry_sha]
2205 if current_stat.st_size != blob_obj.raw_length():
2206 return False
2208 # Size matches, check actual content
2209 try:
2210 with open(full_path, "rb") as f:
2211 current_content = f.read()
2212 expected_content = blob_obj.as_raw_string()
2213 if blob_normalizer and tree_path is not None:
2214 assert isinstance(blob_obj, Blob)
2215 normalized_blob = blob_normalizer.checkout_normalize(
2216 blob_obj, tree_path
2217 )
2218 expected_content = normalized_blob.as_raw_string()
2219 return current_content == expected_content
2220 except (FileNotFoundError, PermissionError, IsADirectoryError):
2221 return False
2224def _transition_to_submodule(
2225 repo: "Repo",
2226 path: bytes,
2227 full_path: bytes,
2228 current_stat: os.stat_result | None,
2229 entry: IndexEntry | TreeEntry,
2230 index: Index,
2231) -> None:
2232 """Transition any type to submodule."""
2233 from .submodule import ensure_submodule_placeholder
2235 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2236 # Already a directory, just ensure .git file exists
2237 ensure_submodule_placeholder(repo, path)
2238 else:
2239 # Remove whatever is there and create submodule
2240 if current_stat is not None:
2241 _remove_file_with_readonly_handling(full_path)
2242 ensure_submodule_placeholder(repo, path)
2244 st = os.lstat(full_path)
2245 assert entry.sha is not None
2246 index[path] = index_entry_from_stat(st, entry.sha)
2249def _transition_to_file(
2250 object_store: "BaseObjectStore",
2251 path: bytes,
2252 full_path: bytes,
2253 current_stat: os.stat_result | None,
2254 entry: IndexEntry | TreeEntry,
2255 index: Index,
2256 honor_filemode: bool,
2257 symlink_fn: Callable[
2258 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2259 ]
2260 | None,
2261 blob_normalizer: "FilterBlobNormalizer | None",
2262 tree_encoding: str = "utf-8",
2263) -> None:
2264 """Transition any type to regular file or symlink."""
2265 assert entry.sha is not None and entry.mode is not None
2266 # Check if we need to update
2267 if (
2268 current_stat is not None
2269 and stat.S_ISREG(current_stat.st_mode)
2270 and not stat.S_ISLNK(entry.mode)
2271 ):
2272 # File to file - check if update needed
2273 file_matches = _check_file_matches(
2274 object_store,
2275 full_path,
2276 entry.sha,
2277 entry.mode,
2278 current_stat,
2279 honor_filemode,
2280 blob_normalizer,
2281 path,
2282 )
2283 needs_update = not file_matches
2284 elif (
2285 current_stat is not None
2286 and stat.S_ISLNK(current_stat.st_mode)
2287 and stat.S_ISLNK(entry.mode)
2288 ):
2289 # Symlink to symlink - check if update needed
2290 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
2291 needs_update = not symlink_matches
2292 else:
2293 needs_update = True
2295 if not needs_update:
2296 # Just update index - current_stat should always be valid here since we're not updating
2297 assert current_stat is not None
2298 index[path] = index_entry_from_stat(current_stat, entry.sha)
2299 return
2301 # Remove existing entry if needed
2302 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2303 # Remove directory
2304 dir_contents = set(os.listdir(full_path))
2305 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2307 if git_file_name in dir_contents:
2308 if dir_contents != {git_file_name}:
2309 raise IsADirectoryError(
2310 f"Cannot replace submodule with untracked files: {full_path!r}"
2311 )
2312 shutil.rmtree(full_path)
2313 else:
2314 try:
2315 os.rmdir(full_path)
2316 except OSError as e:
2317 if e.errno in (errno.ENOTEMPTY, errno.EEXIST):
2318 raise IsADirectoryError(
2319 f"Cannot replace non-empty directory with file: {full_path!r}"
2320 )
2321 raise
2322 elif current_stat is not None:
2323 _remove_file_with_readonly_handling(full_path)
2325 # Ensure parent directory exists
2326 _ensure_parent_dir_exists(full_path)
2328 # Write the file
2329 blob_obj = object_store[entry.sha]
2330 assert isinstance(blob_obj, Blob)
2331 if blob_normalizer:
2332 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
2333 st = build_file_from_blob(
2334 blob_obj,
2335 entry.mode,
2336 full_path,
2337 honor_filemode=honor_filemode,
2338 tree_encoding=tree_encoding,
2339 symlink_fn=symlink_fn,
2340 )
2341 index[path] = index_entry_from_stat(st, entry.sha)
2344def _transition_to_absent(
2345 repo: "Repo",
2346 path: bytes,
2347 full_path: bytes,
2348 current_stat: os.stat_result | None,
2349 index: Index,
2350) -> None:
2351 """Remove any type of entry."""
2352 if current_stat is None:
2353 return
2355 if stat.S_ISDIR(current_stat.st_mode):
2356 # Check if it's a submodule directory
2357 dir_contents = set(os.listdir(full_path))
2358 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2360 if git_file_name in dir_contents and dir_contents == {git_file_name}:
2361 shutil.rmtree(full_path)
2362 else:
2363 try:
2364 os.rmdir(full_path)
2365 except OSError as e:
2366 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
2367 raise
2368 else:
2369 _remove_file_with_readonly_handling(full_path)
2371 try:
2372 del index[path]
2373 except KeyError:
2374 pass
2376 # Try to remove empty parent directories
2377 _remove_empty_parents(
2378 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2379 )
2382def detect_case_only_renames(
2383 changes: Sequence["TreeChange"],
2384 config: "Config",
2385) -> list["TreeChange"]:
2386 """Detect and transform case-only renames in a list of tree changes.
2388 This function identifies file renames that only differ in case (e.g.,
2389 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
2390 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
2391 based on the repository configuration.
2393 Args:
2394 changes: List of TreeChange objects representing file changes
2395 config: Repository configuration object
2397 Returns:
2398 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
2399 """
2400 from .diff_tree import (
2401 CHANGE_ADD,
2402 CHANGE_COPY,
2403 CHANGE_DELETE,
2404 CHANGE_MODIFY,
2405 CHANGE_RENAME,
2406 TreeChange,
2407 )
2409 # Build dictionaries of old and new paths with their normalized forms
2410 old_paths_normalized = {}
2411 new_paths_normalized = {}
2412 old_changes = {} # Map from old path to change object
2413 new_changes = {} # Map from new path to change object
2415 # Get the appropriate normalizer based on config
2416 normalize_func = get_path_element_normalizer(config)
2418 def normalize_path(path: bytes) -> bytes:
2419 """Normalize entire path using element normalization."""
2420 return b"/".join(normalize_func(part) for part in path.split(b"/"))
2422 # Pre-normalize all paths once to avoid repeated normalization
2423 for change in changes:
2424 if change.type == CHANGE_DELETE and change.old:
2425 assert change.old.path is not None
2426 try:
2427 normalized = normalize_path(change.old.path)
2428 except UnicodeDecodeError:
2429 import logging
2431 logging.warning(
2432 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2433 change.old.path,
2434 )
2435 else:
2436 old_paths_normalized[normalized] = change.old.path
2437 old_changes[change.old.path] = change
2438 elif change.type == CHANGE_RENAME and change.old:
2439 assert change.old.path is not None
2440 # Treat RENAME as DELETE + ADD for case-only detection
2441 try:
2442 normalized = normalize_path(change.old.path)
2443 except UnicodeDecodeError:
2444 import logging
2446 logging.warning(
2447 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2448 change.old.path,
2449 )
2450 else:
2451 old_paths_normalized[normalized] = change.old.path
2452 old_changes[change.old.path] = change
2454 if (
2455 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
2456 and change.new
2457 ):
2458 assert change.new.path is not None
2459 try:
2460 normalized = normalize_path(change.new.path)
2461 except UnicodeDecodeError:
2462 import logging
2464 logging.warning(
2465 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2466 change.new.path,
2467 )
2468 else:
2469 new_paths_normalized[normalized] = change.new.path
2470 new_changes[change.new.path] = change
2472 # Find case-only renames and transform changes
2473 case_only_renames = set()
2474 new_rename_changes = []
2476 for norm_path, old_path in old_paths_normalized.items():
2477 if norm_path in new_paths_normalized:
2478 new_path = new_paths_normalized[norm_path]
2479 if old_path != new_path:
2480 # Found a case-only rename
2481 old_change = old_changes[old_path]
2482 new_change = new_changes[new_path]
2484 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
2485 if new_change.type == CHANGE_ADD:
2486 # Simple case: DELETE + ADD becomes RENAME
2487 rename_change = TreeChange(
2488 CHANGE_RENAME, old_change.old, new_change.new
2489 )
2490 else:
2491 # Complex case: DELETE + MODIFY becomes RENAME
2492 # Use the old file from DELETE and new file from MODIFY
2493 rename_change = TreeChange(
2494 CHANGE_RENAME, old_change.old, new_change.new
2495 )
2497 new_rename_changes.append(rename_change)
2499 # Mark the old changes for removal
2500 case_only_renames.add(old_change)
2501 case_only_renames.add(new_change)
2503 # Return new list with original ADD/DELETE changes replaced by renames
2504 result = [change for change in changes if change not in case_only_renames]
2505 result.extend(new_rename_changes)
2506 return result
2509def update_working_tree(
2510 repo: "Repo",
2511 old_tree_id: bytes | None,
2512 new_tree_id: bytes,
2513 change_iterator: Iterator["TreeChange"],
2514 honor_filemode: bool = True,
2515 validate_path_element: Callable[[bytes], bool] | None = None,
2516 symlink_fn: Callable[
2517 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2518 ]
2519 | None = None,
2520 force_remove_untracked: bool = False,
2521 blob_normalizer: "FilterBlobNormalizer | None" = None,
2522 tree_encoding: str = "utf-8",
2523 allow_overwrite_modified: bool = False,
2524) -> None:
2525 """Update the working tree and index to match a new tree.
2527 This function handles:
2528 - Adding new files
2529 - Updating modified files
2530 - Removing deleted files
2531 - Cleaning up empty directories
2533 Args:
2534 repo: Repository object
2535 old_tree_id: SHA of the tree before the update
2536 new_tree_id: SHA of the tree to update to
2537 change_iterator: Iterator of TreeChange objects to apply
2538 honor_filemode: An optional flag to honor core.filemode setting
2539 validate_path_element: Function to validate path elements to check out
2540 symlink_fn: Function to use for creating symlinks
2541 force_remove_untracked: If True, remove files that exist in working
2542 directory but not in target tree, even if old_tree_id is None
2543 blob_normalizer: An optional BlobNormalizer to use for converting line
2544 endings when writing blobs to the working directory.
2545 tree_encoding: Encoding used for tree paths (default: utf-8)
2546 allow_overwrite_modified: If False, raise an error when attempting to
2547 overwrite files that have been modified compared to old_tree_id
2548 """
2549 if validate_path_element is None:
2550 validate_path_element = validate_path_element_default
2552 from .diff_tree import (
2553 CHANGE_ADD,
2554 CHANGE_COPY,
2555 CHANGE_DELETE,
2556 CHANGE_MODIFY,
2557 CHANGE_RENAME,
2558 CHANGE_UNCHANGED,
2559 )
2561 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2562 index = repo.open_index()
2564 # Convert iterator to list since we need multiple passes
2565 changes = list(change_iterator)
2567 # Transform case-only renames on case-insensitive filesystems
2568 import platform
2570 default_ignore_case = platform.system() in ("Windows", "Darwin")
2571 config = repo.get_config()
2572 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2574 if ignore_case:
2575 config = repo.get_config()
2576 changes = detect_case_only_renames(changes, config)
2578 # Check for path conflicts where files need to become directories
2579 paths_becoming_dirs = set()
2580 for change in changes:
2581 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2582 assert change.new is not None
2583 path = change.new.path
2584 assert path is not None
2585 if b"/" in path: # This is a file inside a directory
2586 # Check if any parent path exists as a file in the old tree or changes
2587 parts = path.split(b"/")
2588 for i in range(1, len(parts)):
2589 parent = b"/".join(parts[:i])
2590 # See if this parent path is being deleted (was a file, becoming a dir)
2591 for other_change in changes:
2592 if (
2593 other_change.type == CHANGE_DELETE
2594 and other_change.old
2595 and other_change.old.path == parent
2596 ):
2597 paths_becoming_dirs.add(parent)
2599 # Check if any path that needs to become a directory has been modified
2600 for path in paths_becoming_dirs:
2601 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2602 try:
2603 current_stat = os.lstat(full_path)
2604 except FileNotFoundError:
2605 continue # File doesn't exist, nothing to check
2606 except OSError as e:
2607 raise OSError(
2608 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2609 ) from e
2611 if stat.S_ISREG(current_stat.st_mode):
2612 # Find the old entry for this path
2613 old_change = None
2614 for change in changes:
2615 if (
2616 change.type == CHANGE_DELETE
2617 and change.old
2618 and change.old.path == path
2619 ):
2620 old_change = change
2621 break
2623 if old_change:
2624 # Check if file has been modified
2625 assert old_change.old is not None
2626 assert (
2627 old_change.old.sha is not None and old_change.old.mode is not None
2628 )
2629 file_matches = _check_file_matches(
2630 repo.object_store,
2631 full_path,
2632 old_change.old.sha,
2633 old_change.old.mode,
2634 current_stat,
2635 honor_filemode,
2636 blob_normalizer,
2637 path,
2638 )
2639 if not file_matches:
2640 raise OSError(
2641 f"Cannot replace modified file with directory: {path!r}"
2642 )
2644 # Check for uncommitted modifications before making any changes
2645 if not allow_overwrite_modified and old_tree_id:
2646 for change in changes:
2647 # Only check files that are being modified or deleted
2648 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2649 path = change.old.path
2650 assert path is not None
2651 if path.startswith(b".git") or not validate_path(
2652 path, validate_path_element
2653 ):
2654 continue
2656 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2657 try:
2658 current_stat = os.lstat(full_path)
2659 except FileNotFoundError:
2660 continue # File doesn't exist, nothing to check
2661 except OSError as e:
2662 raise OSError(
2663 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2664 ) from e
2666 if stat.S_ISREG(current_stat.st_mode):
2667 # Check if working tree file differs from old tree
2668 assert change.old.sha is not None and change.old.mode is not None
2669 file_matches = _check_file_matches(
2670 repo.object_store,
2671 full_path,
2672 change.old.sha,
2673 change.old.mode,
2674 current_stat,
2675 honor_filemode,
2676 blob_normalizer,
2677 path,
2678 )
2679 if not file_matches:
2680 from .errors import WorkingTreeModifiedError
2682 raise WorkingTreeModifiedError(
2683 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2684 f"would be overwritten by checkout. "
2685 f"Please commit your changes or stash them before you switch branches."
2686 )
2688 # Apply the changes
2689 for change in changes:
2690 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2691 # Remove file/directory
2692 assert change.old is not None and change.old.path is not None
2693 path = change.old.path
2694 if path.startswith(b".git") or not validate_path(
2695 path, validate_path_element
2696 ):
2697 continue
2699 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2700 try:
2701 delete_stat: os.stat_result | None = os.lstat(full_path)
2702 except FileNotFoundError:
2703 delete_stat = None
2704 except OSError as e:
2705 raise OSError(
2706 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2707 ) from e
2709 _transition_to_absent(repo, path, full_path, delete_stat, index)
2711 if change.type in (
2712 CHANGE_ADD,
2713 CHANGE_MODIFY,
2714 CHANGE_UNCHANGED,
2715 CHANGE_COPY,
2716 CHANGE_RENAME,
2717 ):
2718 # Add or modify file
2719 assert (
2720 change.new is not None
2721 and change.new.path is not None
2722 and change.new.mode is not None
2723 )
2724 path = change.new.path
2725 if path.startswith(b".git") or not validate_path(
2726 path, validate_path_element
2727 ):
2728 continue
2730 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2731 try:
2732 modify_stat: os.stat_result | None = os.lstat(full_path)
2733 except FileNotFoundError:
2734 modify_stat = None
2735 except OSError as e:
2736 raise OSError(
2737 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2738 ) from e
2740 if S_ISGITLINK(change.new.mode):
2741 _transition_to_submodule(
2742 repo, path, full_path, modify_stat, change.new, index
2743 )
2744 else:
2745 _transition_to_file(
2746 repo.object_store,
2747 path,
2748 full_path,
2749 modify_stat,
2750 change.new,
2751 index,
2752 honor_filemode,
2753 symlink_fn,
2754 blob_normalizer,
2755 tree_encoding,
2756 )
2758 index.write()
2761def _stat_matches_entry(st: os.stat_result, entry: IndexEntry) -> bool:
2762 """Check if filesystem stat matches index entry stat.
2764 This is used to determine if a file might have changed without reading its content.
2765 Git uses this optimization to avoid expensive filter operations on unchanged files.
2767 Args:
2768 st: Filesystem stat result
2769 entry: Index entry to compare against
2770 Returns: True if stat matches and file is likely unchanged
2771 """
2772 # Get entry mtime
2773 if isinstance(entry.mtime, tuple):
2774 entry_mtime_sec = entry.mtime[0]
2775 else:
2776 entry_mtime_sec = int(entry.mtime)
2778 # Compare modification time (seconds only for now)
2779 # Note: We use int() to compare only seconds, as nanosecond precision
2780 # can vary across filesystems
2781 if int(st.st_mtime) != entry_mtime_sec:
2782 return False
2784 # Compare file size
2785 if st.st_size != entry.size:
2786 return False
2788 # If both mtime and size match, file is likely unchanged
2789 return True
2792def _check_entry_for_changes(
2793 tree_path: bytes,
2794 entry: IndexEntry | ConflictedIndexEntry,
2795 root_path: bytes,
2796 filter_blob_callback: Callable[[bytes, bytes], bytes] | None = None,
2797) -> bytes | None:
2798 """Check a single index entry for changes.
2800 Args:
2801 tree_path: Path in the tree
2802 entry: Index entry to check
2803 root_path: Root filesystem path
2804 filter_blob_callback: Optional callback to filter blobs
2805 Returns: tree_path if changed, None otherwise
2806 """
2807 if isinstance(entry, ConflictedIndexEntry):
2808 # Conflicted files are always unstaged
2809 return tree_path
2811 full_path = _tree_to_fs_path(root_path, tree_path)
2812 try:
2813 st = os.lstat(full_path)
2814 if stat.S_ISDIR(st.st_mode):
2815 if _has_directory_changed(tree_path, entry):
2816 return tree_path
2817 return None
2819 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2820 return None
2822 # Optimization: If stat matches index entry (mtime and size unchanged),
2823 # we can skip reading and filtering the file entirely. This is a significant
2824 # performance improvement for repositories with many unchanged files.
2825 # Even with filters (e.g., LFS), if the file hasn't been modified (stat unchanged),
2826 # the filter output would be the same, so we can safely skip the expensive
2827 # filter operation. This addresses performance issues with LFS repositories
2828 # where filter operations can be very slow.
2829 if _stat_matches_entry(st, entry):
2830 return None
2832 blob = blob_from_path_and_stat(full_path, st)
2834 if filter_blob_callback is not None:
2835 blob.data = filter_blob_callback(blob.data, tree_path)
2836 except FileNotFoundError:
2837 # The file was removed, so we assume that counts as
2838 # different from whatever file used to exist.
2839 return tree_path
2840 else:
2841 if blob.id != entry.sha:
2842 return tree_path
2843 return None
2846def get_unstaged_changes(
2847 index: Index,
2848 root_path: str | bytes,
2849 filter_blob_callback: Callable[..., Any] | None = None,
2850 preload_index: bool = False,
2851) -> Generator[bytes, None, None]:
2852 """Walk through an index and check for differences against working tree.
2854 Args:
2855 index: index to check
2856 root_path: path in which to find files
2857 filter_blob_callback: Optional callback to filter blobs
2858 preload_index: If True, use parallel threads to check files (requires threading support)
2859 Returns: iterator over paths with unstaged changes
2860 """
2861 # For each entry in the index check the sha1 & ensure not staged
2862 if not isinstance(root_path, bytes):
2863 root_path = os.fsencode(root_path)
2865 if preload_index:
2866 # Use parallel processing for better performance on slow filesystems
2867 try:
2868 import multiprocessing
2869 from concurrent.futures import ThreadPoolExecutor
2870 except ImportError:
2871 # If threading is not available, fall back to serial processing
2872 preload_index = False
2873 else:
2874 # Collect all entries first
2875 entries = list(index.iteritems())
2877 # Use number of CPUs but cap at 8 threads to avoid overhead
2878 num_workers = min(multiprocessing.cpu_count(), 8)
2880 # Process entries in parallel
2881 with ThreadPoolExecutor(max_workers=num_workers) as executor:
2882 # Submit all tasks
2883 futures = [
2884 executor.submit(
2885 _check_entry_for_changes,
2886 tree_path,
2887 entry,
2888 root_path,
2889 filter_blob_callback,
2890 )
2891 for tree_path, entry in entries
2892 ]
2894 # Yield results as they complete
2895 for future in futures:
2896 result = future.result()
2897 if result is not None:
2898 yield result
2900 if not preload_index:
2901 # Serial processing
2902 for tree_path, entry in index.iteritems():
2903 result = _check_entry_for_changes(
2904 tree_path, entry, root_path, filter_blob_callback
2905 )
2906 if result is not None:
2907 yield result
2910def _tree_to_fs_path(
2911 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
2912) -> bytes:
2913 """Convert a git tree path to a file system path.
2915 Args:
2916 root_path: Root filesystem path
2917 tree_path: Git tree path as bytes (encoded with tree_encoding)
2918 tree_encoding: Encoding used for tree paths (default: utf-8)
2920 Returns: File system path.
2921 """
2922 assert isinstance(tree_path, bytes)
2923 if os_sep_bytes != b"/":
2924 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
2925 else:
2926 sep_corrected_path = tree_path
2928 # On Windows, we need to handle tree path encoding properly
2929 if sys.platform == "win32":
2930 # Decode from tree encoding, then re-encode for filesystem
2931 try:
2932 tree_path_str = sep_corrected_path.decode(tree_encoding)
2933 sep_corrected_path = os.fsencode(tree_path_str)
2934 except UnicodeDecodeError:
2935 # If decoding fails, use the original bytes
2936 pass
2938 return os.path.join(root_path, sep_corrected_path)
2941def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes:
2942 """Convert a file system path to a git tree path.
2944 Args:
2945 fs_path: File system path.
2946 tree_encoding: Encoding to use for tree paths (default: utf-8)
2948 Returns: Git tree path as bytes (encoded with tree_encoding)
2949 """
2950 if not isinstance(fs_path, bytes):
2951 fs_path_bytes = os.fsencode(fs_path)
2952 else:
2953 fs_path_bytes = fs_path
2955 # On Windows, we need to ensure tree paths are properly encoded
2956 if sys.platform == "win32":
2957 try:
2958 # Decode from filesystem encoding, then re-encode with tree encoding
2959 fs_path_str = os.fsdecode(fs_path_bytes)
2960 fs_path_bytes = fs_path_str.encode(tree_encoding)
2961 except UnicodeDecodeError:
2962 # If filesystem decoding fails, use the original bytes
2963 pass
2965 if os_sep_bytes != b"/":
2966 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2967 else:
2968 tree_path = fs_path_bytes
2969 return tree_path
2972def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None:
2973 """Create an index entry for a directory.
2975 This is only used for submodules (directories containing .git).
2977 Args:
2978 st: Stat result for the directory
2979 path: Path to the directory
2981 Returns:
2982 IndexEntry for a submodule, or None if not a submodule
2983 """
2984 if os.path.exists(os.path.join(path, b".git")):
2985 head = read_submodule_head(path)
2986 if head is None:
2987 return None
2988 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2989 return None
2992def index_entry_from_path(
2993 path: bytes, object_store: ObjectContainer | None = None
2994) -> IndexEntry | None:
2995 """Create an index from a filesystem path.
2997 This returns an index value for files, symlinks
2998 and tree references. for directories and
2999 non-existent files it returns None
3001 Args:
3002 path: Path to create an index entry for
3003 object_store: Optional object store to
3004 save new blobs in
3005 Returns: An index entry; None for directories
3006 """
3007 assert isinstance(path, bytes)
3008 st = os.lstat(path)
3009 if stat.S_ISDIR(st.st_mode):
3010 return index_entry_from_directory(st, path)
3012 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
3013 blob = blob_from_path_and_stat(path, st)
3014 if object_store is not None:
3015 object_store.add_object(blob)
3016 return index_entry_from_stat(st, blob.id)
3018 return None
3021def iter_fresh_entries(
3022 paths: Iterable[bytes],
3023 root_path: bytes,
3024 object_store: ObjectContainer | None = None,
3025) -> Iterator[tuple[bytes, IndexEntry | None]]:
3026 """Iterate over current versions of index entries on disk.
3028 Args:
3029 paths: Paths to iterate over
3030 root_path: Root path to access from
3031 object_store: Optional store to save new blobs in
3032 Returns: Iterator over path, index_entry
3033 """
3034 for path in paths:
3035 p = _tree_to_fs_path(root_path, path)
3036 try:
3037 entry = index_entry_from_path(p, object_store=object_store)
3038 except (FileNotFoundError, IsADirectoryError):
3039 entry = None
3040 yield path, entry
3043def iter_fresh_objects(
3044 paths: Iterable[bytes],
3045 root_path: bytes,
3046 include_deleted: bool = False,
3047 object_store: ObjectContainer | None = None,
3048) -> Iterator[tuple[bytes, bytes | None, int | None]]:
3049 """Iterate over versions of objects on disk referenced by index.
3051 Args:
3052 paths: Paths to check
3053 root_path: Root path to access from
3054 include_deleted: Include deleted entries with sha and
3055 mode set to None
3056 object_store: Optional object store to report new items to
3057 Returns: Iterator over path, sha, mode
3058 """
3059 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
3060 if entry is None:
3061 if include_deleted:
3062 yield path, None, None
3063 else:
3064 yield path, entry.sha, cleanup_mode(entry.mode)
3067def refresh_index(index: Index, root_path: bytes) -> None:
3068 """Refresh the contents of an index.
3070 This is the equivalent to running 'git commit -a'.
3072 Args:
3073 index: Index to update
3074 root_path: Root filesystem path
3075 """
3076 for path, entry in iter_fresh_entries(index, root_path):
3077 if entry:
3078 index[path] = entry
3081class locked_index:
3082 """Lock the index while making modifications.
3084 Works as a context manager.
3085 """
3087 _file: "_GitFile"
3089 def __init__(self, path: bytes | str) -> None:
3090 """Initialize locked_index."""
3091 self._path = path
3093 def __enter__(self) -> Index:
3094 """Enter context manager and lock index."""
3095 f = GitFile(self._path, "wb")
3096 self._file = f
3097 self._index = Index(self._path)
3098 return self._index
3100 def __exit__(
3101 self,
3102 exc_type: type | None,
3103 exc_value: BaseException | None,
3104 traceback: types.TracebackType | None,
3105 ) -> None:
3106 """Exit context manager and unlock index."""
3107 if exc_type is not None:
3108 self._file.abort()
3109 return
3110 try:
3111 f = SHA1Writer(self._file)
3112 write_index_dict(f, self._index._byname)
3113 except BaseException:
3114 self._file.abort()
3115 else:
3116 f.close()