Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import (
32 Callable,
33 Generator,
34 Iterable,
35 Iterator,
36 Mapping,
37 Sequence,
38 Set,
39)
40from dataclasses import dataclass
41from enum import Enum
42from typing import (
43 IO,
44 TYPE_CHECKING,
45 Any,
46 BinaryIO,
47 Optional,
48 Union,
49)
51if TYPE_CHECKING:
52 from .config import Config
53 from .diff_tree import TreeChange
54 from .file import _GitFile
55 from .filters import FilterBlobNormalizer
56 from .object_store import BaseObjectStore
57 from .repo import Repo
59from .file import GitFile
60from .object_store import iter_tree_contents
61from .objects import (
62 S_IFGITLINK,
63 S_ISGITLINK,
64 Blob,
65 ObjectID,
66 Tree,
67 TreeEntry,
68 hex_to_sha,
69 sha_to_hex,
70)
71from .pack import ObjectContainer, SHA1Reader, SHA1Writer
73# Type alias for recursive tree structure used in commit_tree
74TreeDict = dict[bytes, Union["TreeDict", tuple[int, bytes]]]
76# 2-bit stage (during merge)
77FLAG_STAGEMASK = 0x3000
78FLAG_STAGESHIFT = 12
79FLAG_NAMEMASK = 0x0FFF
81# assume-valid
82FLAG_VALID = 0x8000
84# extended flag (must be zero in version 2)
85FLAG_EXTENDED = 0x4000
87# used by sparse checkout
88EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
90# used by "git add -N"
91EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
93DEFAULT_VERSION = 2
95# Index extension signatures
96TREE_EXTENSION = b"TREE"
97REUC_EXTENSION = b"REUC"
98UNTR_EXTENSION = b"UNTR"
99EOIE_EXTENSION = b"EOIE"
100IEOT_EXTENSION = b"IEOT"
101SDIR_EXTENSION = b"sdir" # Sparse directory extension
104def _encode_varint(value: int) -> bytes:
105 """Encode an integer using variable-width encoding.
107 Same format as used for OFS_DELTA pack entries and index v4 path compression.
108 Uses 7 bits per byte, with the high bit indicating continuation.
110 Args:
111 value: Integer to encode
112 Returns:
113 Encoded bytes
114 """
115 if value == 0:
116 return b"\x00"
118 result = []
119 while value > 0:
120 byte = value & 0x7F # Take lower 7 bits
121 value >>= 7
122 if value > 0:
123 byte |= 0x80 # Set continuation bit
124 result.append(byte)
126 return bytes(result)
129def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
130 """Decode a variable-width encoded integer.
132 Args:
133 data: Bytes to decode from
134 offset: Starting offset in data
135 Returns:
136 tuple of (decoded_value, new_offset)
137 """
138 value = 0
139 shift = 0
140 pos = offset
142 while pos < len(data):
143 byte = data[pos]
144 pos += 1
145 value |= (byte & 0x7F) << shift
146 shift += 7
147 if not (byte & 0x80): # No continuation bit
148 break
150 return value, pos
153def _compress_path(path: bytes, previous_path: bytes) -> bytes:
154 """Compress a path relative to the previous path for index version 4.
156 Args:
157 path: Path to compress
158 previous_path: Previous path for comparison
159 Returns:
160 Compressed path data (varint prefix_len + suffix)
161 """
162 # Find the common prefix length
163 common_len = 0
164 min_len = min(len(path), len(previous_path))
166 for i in range(min_len):
167 if path[i] == previous_path[i]:
168 common_len += 1
169 else:
170 break
172 # The number of bytes to remove from the end of previous_path
173 # to get the common prefix
174 remove_len = len(previous_path) - common_len
176 # The suffix to append
177 suffix = path[common_len:]
179 # Encode: varint(remove_len) + suffix + NUL
180 return _encode_varint(remove_len) + suffix + b"\x00"
183def _decompress_path(
184 data: bytes, offset: int, previous_path: bytes
185) -> tuple[bytes, int]:
186 """Decompress a path from index version 4 compressed format.
188 Args:
189 data: Raw data containing compressed path
190 offset: Starting offset in data
191 previous_path: Previous path for decompression
192 Returns:
193 tuple of (decompressed_path, new_offset)
194 """
195 # Decode the number of bytes to remove from previous path
196 remove_len, new_offset = _decode_varint(data, offset)
198 # Find the NUL terminator for the suffix
199 suffix_start = new_offset
200 suffix_end = suffix_start
201 while suffix_end < len(data) and data[suffix_end] != 0:
202 suffix_end += 1
204 if suffix_end >= len(data):
205 raise ValueError("Unterminated path suffix in compressed entry")
207 suffix = data[suffix_start:suffix_end]
208 new_offset = suffix_end + 1 # Skip the NUL terminator
210 # Reconstruct the path
211 if remove_len > len(previous_path):
212 raise ValueError(
213 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
214 )
216 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
217 path = prefix + suffix
219 return path, new_offset
222def _decompress_path_from_stream(
223 f: BinaryIO, previous_path: bytes
224) -> tuple[bytes, int]:
225 """Decompress a path from index version 4 compressed format, reading from stream.
227 Args:
228 f: File-like object to read from
229 previous_path: Previous path for decompression
230 Returns:
231 tuple of (decompressed_path, bytes_consumed)
232 """
233 # Decode the varint for remove_len by reading byte by byte
234 remove_len = 0
235 shift = 0
236 bytes_consumed = 0
238 while True:
239 byte_data = f.read(1)
240 if not byte_data:
241 raise ValueError("Unexpected end of file while reading varint")
242 byte = byte_data[0]
243 bytes_consumed += 1
244 remove_len |= (byte & 0x7F) << shift
245 shift += 7
246 if not (byte & 0x80): # No continuation bit
247 break
249 # Read the suffix until NUL terminator
250 suffix = b""
251 while True:
252 byte_data = f.read(1)
253 if not byte_data:
254 raise ValueError("Unexpected end of file while reading path suffix")
255 byte = byte_data[0]
256 bytes_consumed += 1
257 if byte == 0: # NUL terminator
258 break
259 suffix += bytes([byte])
261 # Reconstruct the path
262 if remove_len > len(previous_path):
263 raise ValueError(
264 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
265 )
267 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
268 path = prefix + suffix
270 return path, bytes_consumed
273class Stage(Enum):
274 """Represents the stage of an index entry during merge conflicts."""
276 NORMAL = 0
277 MERGE_CONFLICT_ANCESTOR = 1
278 MERGE_CONFLICT_THIS = 2
279 MERGE_CONFLICT_OTHER = 3
282@dataclass
283class SerializedIndexEntry:
284 """Represents a serialized index entry as stored in the index file.
286 This dataclass holds the raw data for an index entry before it's
287 parsed into the more user-friendly IndexEntry format.
288 """
290 name: bytes
291 ctime: int | float | tuple[int, int]
292 mtime: int | float | tuple[int, int]
293 dev: int
294 ino: int
295 mode: int
296 uid: int
297 gid: int
298 size: int
299 sha: bytes
300 flags: int
301 extended_flags: int
303 def stage(self) -> Stage:
304 """Extract the stage from the flags field.
306 Returns:
307 Stage enum value indicating merge conflict state
308 """
309 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
311 def is_sparse_dir(self) -> bool:
312 """Check if this entry represents a sparse directory.
314 A sparse directory entry is a collapsed representation of an entire
315 directory tree in a sparse index. It has:
316 - Directory mode (0o040000)
317 - SKIP_WORKTREE flag set
318 - Path ending with '/'
319 - SHA pointing to a tree object
321 Returns:
322 True if entry is a sparse directory entry
323 """
324 return (
325 stat.S_ISDIR(self.mode)
326 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
327 and self.name.endswith(b"/")
328 )
331@dataclass
332class IndexExtension:
333 """Base class for index extensions."""
335 signature: bytes
336 data: bytes
338 @classmethod
339 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
340 """Create an extension from raw data.
342 Args:
343 signature: 4-byte extension signature
344 data: Extension data
345 Returns:
346 Parsed extension object
347 """
348 if signature == TREE_EXTENSION:
349 return TreeExtension.from_bytes(data)
350 elif signature == REUC_EXTENSION:
351 return ResolveUndoExtension.from_bytes(data)
352 elif signature == UNTR_EXTENSION:
353 return UntrackedExtension.from_bytes(data)
354 elif signature == SDIR_EXTENSION:
355 return SparseDirExtension.from_bytes(data)
356 else:
357 # Unknown extension - just store raw data
358 return cls(signature, data)
360 def to_bytes(self) -> bytes:
361 """Serialize extension to bytes."""
362 return self.data
365class TreeExtension(IndexExtension):
366 """Tree cache extension."""
368 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
369 """Initialize TreeExtension.
371 Args:
372 entries: List of tree cache entries (path, sha, flags)
373 """
374 self.entries = entries
375 super().__init__(TREE_EXTENSION, b"")
377 @classmethod
378 def from_bytes(cls, data: bytes) -> "TreeExtension":
379 """Parse TreeExtension from bytes.
381 Args:
382 data: Raw bytes to parse
384 Returns:
385 TreeExtension instance
386 """
387 # TODO: Implement tree cache parsing
388 return cls([])
390 def to_bytes(self) -> bytes:
391 """Serialize TreeExtension to bytes.
393 Returns:
394 Serialized extension data
395 """
396 # TODO: Implement tree cache serialization
397 return b""
400class ResolveUndoExtension(IndexExtension):
401 """Resolve undo extension for recording merge conflicts."""
403 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
404 """Initialize ResolveUndoExtension.
406 Args:
407 entries: List of (path, stages) where stages is a list of (stage, sha) tuples
408 """
409 self.entries = entries
410 super().__init__(REUC_EXTENSION, b"")
412 @classmethod
413 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
414 """Parse ResolveUndoExtension from bytes.
416 Args:
417 data: Raw bytes to parse
419 Returns:
420 ResolveUndoExtension instance
421 """
422 # TODO: Implement resolve undo parsing
423 return cls([])
425 def to_bytes(self) -> bytes:
426 """Serialize ResolveUndoExtension to bytes.
428 Returns:
429 Serialized extension data
430 """
431 # TODO: Implement resolve undo serialization
432 return b""
435class UntrackedExtension(IndexExtension):
436 """Untracked cache extension."""
438 def __init__(self, data: bytes) -> None:
439 """Initialize UntrackedExtension.
441 Args:
442 data: Raw untracked cache data
443 """
444 super().__init__(UNTR_EXTENSION, data)
446 @classmethod
447 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
448 """Parse UntrackedExtension from bytes.
450 Args:
451 data: Raw bytes to parse
453 Returns:
454 UntrackedExtension instance
455 """
456 return cls(data)
459class SparseDirExtension(IndexExtension):
460 """Sparse directory extension.
462 This extension indicates that the index contains sparse directory entries.
463 Tools that don't understand sparse index should avoid interacting with
464 the index when this extension is present.
466 The extension data is empty - its presence is the signal.
467 """
469 def __init__(self) -> None:
470 """Initialize SparseDirExtension."""
471 super().__init__(SDIR_EXTENSION, b"")
473 @classmethod
474 def from_bytes(cls, data: bytes) -> "SparseDirExtension":
475 """Parse SparseDirExtension from bytes.
477 Args:
478 data: Raw bytes to parse (should be empty)
480 Returns:
481 SparseDirExtension instance
482 """
483 return cls()
485 def to_bytes(self) -> bytes:
486 """Serialize SparseDirExtension to bytes.
488 Returns:
489 Empty bytes (extension presence is the signal)
490 """
491 return b""
494@dataclass
495class IndexEntry:
496 """Represents an entry in the Git index.
498 This is a higher-level representation of an index entry that includes
499 parsed data and convenience methods.
500 """
502 ctime: int | float | tuple[int, int]
503 mtime: int | float | tuple[int, int]
504 dev: int
505 ino: int
506 mode: int
507 uid: int
508 gid: int
509 size: int
510 sha: bytes
511 flags: int = 0
512 extended_flags: int = 0
514 @classmethod
515 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
516 """Create an IndexEntry from a SerializedIndexEntry.
518 Args:
519 serialized: SerializedIndexEntry to convert
521 Returns:
522 New IndexEntry instance
523 """
524 return cls(
525 ctime=serialized.ctime,
526 mtime=serialized.mtime,
527 dev=serialized.dev,
528 ino=serialized.ino,
529 mode=serialized.mode,
530 uid=serialized.uid,
531 gid=serialized.gid,
532 size=serialized.size,
533 sha=serialized.sha,
534 flags=serialized.flags,
535 extended_flags=serialized.extended_flags,
536 )
538 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
539 """Serialize this entry with a given name and stage.
541 Args:
542 name: Path name for the entry
543 stage: Merge conflict stage
545 Returns:
546 SerializedIndexEntry ready for writing to disk
547 """
548 # Clear out any existing stage bits, then set them from the Stage.
549 new_flags = self.flags & ~FLAG_STAGEMASK
550 new_flags |= stage.value << FLAG_STAGESHIFT
551 return SerializedIndexEntry(
552 name=name,
553 ctime=self.ctime,
554 mtime=self.mtime,
555 dev=self.dev,
556 ino=self.ino,
557 mode=self.mode,
558 uid=self.uid,
559 gid=self.gid,
560 size=self.size,
561 sha=self.sha,
562 flags=new_flags,
563 extended_flags=self.extended_flags,
564 )
566 def stage(self) -> Stage:
567 """Get the merge conflict stage of this entry.
569 Returns:
570 Stage enum value
571 """
572 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
574 @property
575 def skip_worktree(self) -> bool:
576 """Return True if the skip-worktree bit is set in extended_flags."""
577 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
579 def set_skip_worktree(self, skip: bool = True) -> None:
580 """Helper method to set or clear the skip-worktree bit in extended_flags.
582 Also sets FLAG_EXTENDED in self.flags if needed.
583 """
584 if skip:
585 # Turn on the skip-worktree bit
586 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
587 # Also ensure the main 'extended' bit is set in flags
588 self.flags |= FLAG_EXTENDED
589 else:
590 # Turn off the skip-worktree bit
591 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
592 # Optionally unset the main extended bit if no extended flags remain
593 if self.extended_flags == 0:
594 self.flags &= ~FLAG_EXTENDED
596 def is_sparse_dir(self, name: bytes) -> bool:
597 """Check if this entry represents a sparse directory.
599 A sparse directory entry is a collapsed representation of an entire
600 directory tree in a sparse index. It has:
601 - Directory mode (0o040000)
602 - SKIP_WORKTREE flag set
603 - Path ending with '/'
604 - SHA pointing to a tree object
606 Args:
607 name: The path name for this entry (IndexEntry doesn't store name)
609 Returns:
610 True if entry is a sparse directory entry
611 """
612 return (
613 stat.S_ISDIR(self.mode)
614 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
615 and name.endswith(b"/")
616 )
619class ConflictedIndexEntry:
620 """Index entry that represents a conflict."""
622 ancestor: IndexEntry | None
623 this: IndexEntry | None
624 other: IndexEntry | None
626 def __init__(
627 self,
628 ancestor: IndexEntry | None = None,
629 this: IndexEntry | None = None,
630 other: IndexEntry | None = None,
631 ) -> None:
632 """Initialize ConflictedIndexEntry.
634 Args:
635 ancestor: The common ancestor entry
636 this: The current branch entry
637 other: The other branch entry
638 """
639 self.ancestor = ancestor
640 self.this = this
641 self.other = other
644class UnmergedEntries(Exception):
645 """Unmerged entries exist in the index."""
648def pathsplit(path: bytes) -> tuple[bytes, bytes]:
649 """Split a /-delimited path into a directory part and a basename.
651 Args:
652 path: The path to split.
654 Returns:
655 Tuple with directory name and basename
656 """
657 try:
658 (dirname, basename) = path.rsplit(b"/", 1)
659 except ValueError:
660 return (b"", path)
661 else:
662 return (dirname, basename)
665def pathjoin(*args: bytes) -> bytes:
666 """Join a /-delimited path."""
667 return b"/".join([p for p in args if p])
670def read_cache_time(f: BinaryIO) -> tuple[int, int]:
671 """Read a cache time.
673 Args:
674 f: File-like object to read from
675 Returns:
676 Tuple with seconds and nanoseconds
677 """
678 return struct.unpack(">LL", f.read(8))
681def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None:
682 """Write a cache time.
684 Args:
685 f: File-like object to write to
686 t: Time to write (as int, float or tuple with secs and nsecs)
687 """
688 if isinstance(t, int):
689 t = (t, 0)
690 elif isinstance(t, float):
691 (secs, nsecs) = divmod(t, 1.0)
692 t = (int(secs), int(nsecs * 1000000000))
693 elif not isinstance(t, tuple):
694 raise TypeError(t)
695 f.write(struct.pack(">LL", *t))
698def read_cache_entry(
699 f: BinaryIO, version: int, previous_path: bytes = b""
700) -> SerializedIndexEntry:
701 """Read an entry from a cache file.
703 Args:
704 f: File-like object to read from
705 version: Index version
706 previous_path: Previous entry's path (for version 4 compression)
707 """
708 beginoffset = f.tell()
709 ctime = read_cache_time(f)
710 mtime = read_cache_time(f)
711 (
712 dev,
713 ino,
714 mode,
715 uid,
716 gid,
717 size,
718 sha,
719 flags,
720 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
721 if flags & FLAG_EXTENDED:
722 if version < 3:
723 raise AssertionError("extended flag set in index with version < 3")
724 (extended_flags,) = struct.unpack(">H", f.read(2))
725 else:
726 extended_flags = 0
728 if version >= 4:
729 # Version 4: paths are always compressed (name_len should be 0)
730 name, _consumed = _decompress_path_from_stream(f, previous_path)
731 else:
732 # Versions < 4: regular name reading
733 name = f.read(flags & FLAG_NAMEMASK)
735 # Padding:
736 if version < 4:
737 real_size = (f.tell() - beginoffset + 8) & ~7
738 f.read((beginoffset + real_size) - f.tell())
740 return SerializedIndexEntry(
741 name,
742 ctime,
743 mtime,
744 dev,
745 ino,
746 mode,
747 uid,
748 gid,
749 size,
750 sha_to_hex(sha),
751 flags & ~FLAG_NAMEMASK,
752 extended_flags,
753 )
756def write_cache_entry(
757 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
758) -> None:
759 """Write an index entry to a file.
761 Args:
762 f: File object
763 entry: IndexEntry to write
764 version: Index format version
765 previous_path: Previous entry's path (for version 4 compression)
766 """
767 beginoffset = f.tell()
768 write_cache_time(f, entry.ctime)
769 write_cache_time(f, entry.mtime)
771 if version >= 4:
772 # Version 4: use compression but set name_len to actual filename length
773 # This matches how C Git implements index v4 flags
774 compressed_path = _compress_path(entry.name, previous_path)
775 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
776 else:
777 # Versions < 4: include actual name length
778 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
780 if entry.extended_flags:
781 flags |= FLAG_EXTENDED
782 if flags & FLAG_EXTENDED and version is not None and version < 3:
783 raise AssertionError("unable to use extended flags in version < 3")
785 f.write(
786 struct.pack(
787 b">LLLLLL20sH",
788 entry.dev & 0xFFFFFFFF,
789 entry.ino & 0xFFFFFFFF,
790 entry.mode,
791 entry.uid,
792 entry.gid,
793 entry.size,
794 hex_to_sha(entry.sha),
795 flags,
796 )
797 )
798 if flags & FLAG_EXTENDED:
799 f.write(struct.pack(b">H", entry.extended_flags))
801 if version >= 4:
802 # Version 4: always write compressed path
803 f.write(compressed_path)
804 else:
805 # Versions < 4: write regular path and padding
806 f.write(entry.name)
807 real_size = (f.tell() - beginoffset + 8) & ~7
808 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
811class UnsupportedIndexFormat(Exception):
812 """An unsupported index format was encountered."""
814 def __init__(self, version: int) -> None:
815 """Initialize UnsupportedIndexFormat exception.
817 Args:
818 version: The unsupported index format version
819 """
820 self.index_format_version = version
823def read_index_header(f: BinaryIO) -> tuple[int, int]:
824 """Read an index header from a file.
826 Returns:
827 tuple of (version, num_entries)
828 """
829 header = f.read(4)
830 if header != b"DIRC":
831 raise AssertionError(f"Invalid index file header: {header!r}")
832 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
833 if version not in (1, 2, 3, 4):
834 raise UnsupportedIndexFormat(version)
835 return version, num_entries
838def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None:
839 """Write an index extension.
841 Args:
842 f: File-like object to write to
843 extension: Extension to write
844 """
845 data = extension.to_bytes()
846 f.write(extension.signature)
847 f.write(struct.pack(">I", len(data)))
848 f.write(data)
851def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
852 """Read an index file, yielding the individual entries."""
853 version, num_entries = read_index_header(f)
854 previous_path = b""
855 for i in range(num_entries):
856 entry = read_cache_entry(f, version, previous_path)
857 previous_path = entry.name
858 yield entry
861def read_index_dict_with_version(
862 f: BinaryIO,
863) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]:
864 """Read an index file and return it as a dictionary along with the version.
866 Returns:
867 tuple of (entries_dict, version, extensions)
868 """
869 version, num_entries = read_index_header(f)
871 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
872 previous_path = b""
873 for i in range(num_entries):
874 entry = read_cache_entry(f, version, previous_path)
875 previous_path = entry.name
876 stage = entry.stage()
877 if stage == Stage.NORMAL:
878 ret[entry.name] = IndexEntry.from_serialized(entry)
879 else:
880 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
881 if isinstance(existing, IndexEntry):
882 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
883 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
884 existing.ancestor = IndexEntry.from_serialized(entry)
885 elif stage == Stage.MERGE_CONFLICT_THIS:
886 existing.this = IndexEntry.from_serialized(entry)
887 elif stage == Stage.MERGE_CONFLICT_OTHER:
888 existing.other = IndexEntry.from_serialized(entry)
890 # Read extensions
891 extensions = []
892 while True:
893 # Check if we're at the end (20 bytes before EOF for SHA checksum)
894 current_pos = f.tell()
895 f.seek(0, 2) # EOF
896 eof_pos = f.tell()
897 f.seek(current_pos)
899 if current_pos >= eof_pos - 20:
900 break
902 # Try to read extension signature
903 signature = f.read(4)
904 if len(signature) < 4:
905 break
907 # Check if it's a valid extension signature (4 uppercase letters)
908 if not all(65 <= b <= 90 for b in signature):
909 # Not an extension, seek back
910 f.seek(-4, 1)
911 break
913 # Read extension size
914 size_data = f.read(4)
915 if len(size_data) < 4:
916 break
917 size = struct.unpack(">I", size_data)[0]
919 # Read extension data
920 data = f.read(size)
921 if len(data) < size:
922 break
924 extension = IndexExtension.from_raw(signature, data)
925 extensions.append(extension)
927 return ret, version, extensions
930def read_index_dict(
931 f: BinaryIO,
932) -> dict[bytes, IndexEntry | ConflictedIndexEntry]:
933 """Read an index file and return it as a dictionary.
935 Dict Key is tuple of path and stage number, as
936 path alone is not unique
937 Args:
938 f: File object to read fromls.
939 """
940 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
941 for entry in read_index(f):
942 stage = entry.stage()
943 if stage == Stage.NORMAL:
944 ret[entry.name] = IndexEntry.from_serialized(entry)
945 else:
946 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
947 if isinstance(existing, IndexEntry):
948 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
949 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
950 existing.ancestor = IndexEntry.from_serialized(entry)
951 elif stage == Stage.MERGE_CONFLICT_THIS:
952 existing.this = IndexEntry.from_serialized(entry)
953 elif stage == Stage.MERGE_CONFLICT_OTHER:
954 existing.other = IndexEntry.from_serialized(entry)
955 return ret
958def write_index(
959 f: IO[bytes],
960 entries: Sequence[SerializedIndexEntry],
961 version: int | None = None,
962 extensions: Sequence[IndexExtension] | None = None,
963) -> None:
964 """Write an index file.
966 Args:
967 f: File-like object to write to
968 version: Version number to write
969 entries: Iterable over the entries to write
970 extensions: Optional list of extensions to write
971 """
972 if version is None:
973 version = DEFAULT_VERSION
974 # STEP 1: check if any extended_flags are set
975 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
976 if uses_extended_flags and version < 3:
977 # Force or bump the version to 3
978 version = 3
979 # The rest is unchanged, but you might insert a final check:
980 if version < 3:
981 # Double-check no extended flags appear
982 for e in entries:
983 if e.extended_flags != 0:
984 raise AssertionError("Attempt to use extended flags in index < v3")
985 # Proceed with the existing code to write the header and entries.
986 f.write(b"DIRC")
987 f.write(struct.pack(b">LL", version, len(entries)))
988 previous_path = b""
989 for entry in entries:
990 write_cache_entry(f, entry, version=version, previous_path=previous_path)
991 previous_path = entry.name
993 # Write extensions
994 if extensions:
995 for extension in extensions:
996 write_index_extension(f, extension)
999def write_index_dict(
1000 f: IO[bytes],
1001 entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry],
1002 version: int | None = None,
1003 extensions: Sequence[IndexExtension] | None = None,
1004) -> None:
1005 """Write an index file based on the contents of a dictionary.
1007 being careful to sort by path and then by stage.
1008 """
1009 entries_list = []
1010 for key in sorted(entries):
1011 value = entries[key]
1012 if isinstance(value, ConflictedIndexEntry):
1013 if value.ancestor is not None:
1014 entries_list.append(
1015 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
1016 )
1017 if value.this is not None:
1018 entries_list.append(
1019 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
1020 )
1021 if value.other is not None:
1022 entries_list.append(
1023 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
1024 )
1025 else:
1026 entries_list.append(value.serialize(key, Stage.NORMAL))
1028 write_index(f, entries_list, version=version, extensions=extensions)
1031def cleanup_mode(mode: int) -> int:
1032 """Cleanup a mode value.
1034 This will return a mode that can be stored in a tree object.
1036 Args:
1037 mode: Mode to clean up.
1039 Returns:
1040 mode
1041 """
1042 if stat.S_ISLNK(mode):
1043 return stat.S_IFLNK
1044 elif stat.S_ISDIR(mode):
1045 return stat.S_IFDIR
1046 elif S_ISGITLINK(mode):
1047 return S_IFGITLINK
1048 ret = stat.S_IFREG | 0o644
1049 if mode & 0o100:
1050 ret |= 0o111
1051 return ret
1054class Index:
1055 """A Git Index file."""
1057 _byname: dict[bytes, IndexEntry | ConflictedIndexEntry]
1059 def __init__(
1060 self,
1061 filename: bytes | str | os.PathLike[str],
1062 read: bool = True,
1063 skip_hash: bool = False,
1064 version: int | None = None,
1065 ) -> None:
1066 """Create an index object associated with the given filename.
1068 Args:
1069 filename: Path to the index file
1070 read: Whether to initialize the index from the given file, should it exist.
1071 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
1072 version: Index format version to use (None = auto-detect from file or use default)
1073 """
1074 self._filename = os.fspath(filename)
1075 # TODO(jelmer): Store the version returned by read_index
1076 self._version = version
1077 self._skip_hash = skip_hash
1078 self._extensions: list[IndexExtension] = []
1079 self.clear()
1080 if read:
1081 self.read()
1083 @property
1084 def path(self) -> bytes | str:
1085 """Get the path to the index file.
1087 Returns:
1088 Path to the index file
1089 """
1090 return self._filename
1092 def __repr__(self) -> str:
1093 """Return string representation of Index."""
1094 return f"{self.__class__.__name__}({self._filename!r})"
1096 def write(self) -> None:
1097 """Write current contents of index to disk."""
1098 f = GitFile(self._filename, "wb")
1099 try:
1100 # Filter out extensions with no meaningful data
1101 meaningful_extensions = []
1102 for ext in self._extensions:
1103 # Skip extensions that have empty data
1104 ext_data = ext.to_bytes()
1105 if ext_data:
1106 meaningful_extensions.append(ext)
1108 if self._skip_hash:
1109 # When skipHash is enabled, write the index without computing SHA1
1110 write_index_dict(
1111 f,
1112 self._byname,
1113 version=self._version,
1114 extensions=meaningful_extensions,
1115 )
1116 # Write 20 zero bytes instead of SHA1
1117 f.write(b"\x00" * 20)
1118 f.close()
1119 else:
1120 sha1_writer = SHA1Writer(f)
1121 write_index_dict(
1122 sha1_writer,
1123 self._byname,
1124 version=self._version,
1125 extensions=meaningful_extensions,
1126 )
1127 sha1_writer.close()
1128 except:
1129 f.close()
1130 raise
1132 def read(self) -> None:
1133 """Read current contents of index from disk."""
1134 if not os.path.exists(self._filename):
1135 return
1136 f = GitFile(self._filename, "rb")
1137 try:
1138 sha1_reader = SHA1Reader(f)
1139 entries, version, extensions = read_index_dict_with_version(sha1_reader)
1140 self._version = version
1141 self._extensions = extensions
1142 self.update(entries)
1143 # Extensions have already been read by read_index_dict_with_version
1144 sha1_reader.check_sha(allow_empty=True)
1145 finally:
1146 f.close()
1148 def __len__(self) -> int:
1149 """Number of entries in this index file."""
1150 return len(self._byname)
1152 def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry:
1153 """Retrieve entry by relative path and stage.
1155 Returns: Either a IndexEntry or a ConflictedIndexEntry
1156 Raises KeyError: if the entry does not exist
1157 """
1158 return self._byname[key]
1160 def __iter__(self) -> Iterator[bytes]:
1161 """Iterate over the paths and stages in this index."""
1162 return iter(self._byname)
1164 def __contains__(self, key: bytes) -> bool:
1165 """Check if a path exists in the index."""
1166 return key in self._byname
1168 def get_sha1(self, path: bytes) -> bytes:
1169 """Return the (git object) SHA1 for the object at a path."""
1170 value = self[path]
1171 if isinstance(value, ConflictedIndexEntry):
1172 raise UnmergedEntries
1173 return value.sha
1175 def get_mode(self, path: bytes) -> int:
1176 """Return the POSIX file mode for the object at a path."""
1177 value = self[path]
1178 if isinstance(value, ConflictedIndexEntry):
1179 raise UnmergedEntries
1180 return value.mode
1182 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
1183 """Iterate over path, sha, mode tuples for use with commit_tree."""
1184 for path in self:
1185 entry = self[path]
1186 if isinstance(entry, ConflictedIndexEntry):
1187 raise UnmergedEntries
1188 yield path, entry.sha, cleanup_mode(entry.mode)
1190 def has_conflicts(self) -> bool:
1191 """Check if the index contains any conflicted entries.
1193 Returns:
1194 True if any entries are conflicted, False otherwise
1195 """
1196 for value in self._byname.values():
1197 if isinstance(value, ConflictedIndexEntry):
1198 return True
1199 return False
1201 def clear(self) -> None:
1202 """Remove all contents from this index."""
1203 self._byname = {}
1205 def __setitem__(
1206 self, name: bytes, value: IndexEntry | ConflictedIndexEntry
1207 ) -> None:
1208 """Set an entry in the index."""
1209 assert isinstance(name, bytes)
1210 self._byname[name] = value
1212 def __delitem__(self, name: bytes) -> None:
1213 """Delete an entry from the index."""
1214 del self._byname[name]
1216 def iteritems(
1217 self,
1218 ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
1219 """Iterate over (path, entry) pairs in the index.
1221 Returns:
1222 Iterator of (path, entry) tuples
1223 """
1224 return iter(self._byname.items())
1226 def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
1227 """Get an iterator over (path, entry) pairs.
1229 Returns:
1230 Iterator of (path, entry) tuples
1231 """
1232 return iter(self._byname.items())
1234 def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None:
1235 """Update the index with multiple entries.
1237 Args:
1238 entries: Dictionary mapping paths to index entries
1239 """
1240 for key, value in entries.items():
1241 self[key] = value
1243 def paths(self) -> Generator[bytes, None, None]:
1244 """Generate all paths in the index.
1246 Yields:
1247 Path names as bytes
1248 """
1249 yield from self._byname.keys()
1251 def changes_from_tree(
1252 self,
1253 object_store: ObjectContainer,
1254 tree: ObjectID,
1255 want_unchanged: bool = False,
1256 ) -> Generator[
1257 tuple[
1258 tuple[bytes | None, bytes | None],
1259 tuple[int | None, int | None],
1260 tuple[bytes | None, bytes | None],
1261 ],
1262 None,
1263 None,
1264 ]:
1265 """Find the differences between the contents of this index and a tree.
1267 Args:
1268 object_store: Object store to use for retrieving tree contents
1269 tree: SHA1 of the root tree
1270 want_unchanged: Whether unchanged files should be reported
1271 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1272 newmode), (oldsha, newsha)
1273 """
1275 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1276 entry = self[path]
1277 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1278 return entry.sha, cleanup_mode(entry.mode)
1279 else:
1280 # Handle ConflictedIndexEntry case
1281 return b"", 0
1283 yield from changes_from_tree(
1284 self.paths(),
1285 lookup_entry,
1286 object_store,
1287 tree,
1288 want_unchanged=want_unchanged,
1289 )
1291 def commit(self, object_store: ObjectContainer) -> bytes:
1292 """Create a new tree from an index.
1294 Args:
1295 object_store: Object store to save the tree in
1296 Returns:
1297 Root tree SHA
1298 """
1299 return commit_tree(object_store, self.iterobjects())
1301 def is_sparse(self) -> bool:
1302 """Check if this index contains sparse directory entries.
1304 Returns:
1305 True if any sparse directory extension is present
1306 """
1307 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions)
1309 def ensure_full_index(self, object_store: "BaseObjectStore") -> None:
1310 """Expand all sparse directory entries into full file entries.
1312 This converts a sparse index into a full index by recursively
1313 expanding any sparse directory entries into their constituent files.
1315 Args:
1316 object_store: Object store to read tree objects from
1318 Raises:
1319 KeyError: If a tree object referenced by a sparse dir entry doesn't exist
1320 """
1321 if not self.is_sparse():
1322 return
1324 # Find all sparse directory entries
1325 sparse_dirs = []
1326 for path, entry in list(self._byname.items()):
1327 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path):
1328 sparse_dirs.append((path, entry))
1330 # Expand each sparse directory
1331 for path, entry in sparse_dirs:
1332 # Remove the sparse directory entry
1333 del self._byname[path]
1335 # Get the tree object
1336 tree = object_store[entry.sha]
1337 if not isinstance(tree, Tree):
1338 raise ValueError(f"Sparse directory {path!r} points to non-tree object")
1340 # Recursively add all entries from the tree
1341 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry)
1343 # Remove the sparse directory extension
1344 self._extensions = [
1345 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension)
1346 ]
1348 def _expand_tree(
1349 self,
1350 prefix: bytes,
1351 tree: Tree,
1352 object_store: "BaseObjectStore",
1353 template_entry: IndexEntry,
1354 ) -> None:
1355 """Recursively expand a tree into index entries.
1357 Args:
1358 prefix: Path prefix for entries (without trailing slash)
1359 tree: Tree object to expand
1360 object_store: Object store to read nested trees from
1361 template_entry: Template entry to copy metadata from
1362 """
1363 for name, mode, sha in tree.items():
1364 if prefix:
1365 full_path = prefix + b"/" + name
1366 else:
1367 full_path = name
1369 if stat.S_ISDIR(mode):
1370 # Recursively expand subdirectories
1371 subtree = object_store[sha]
1372 if not isinstance(subtree, Tree):
1373 raise ValueError(
1374 f"Directory entry {full_path!r} points to non-tree object"
1375 )
1376 self._expand_tree(full_path, subtree, object_store, template_entry)
1377 else:
1378 # Create an index entry for this file
1379 # Use the template entry for metadata but with the file's sha and mode
1380 new_entry = IndexEntry(
1381 ctime=template_entry.ctime,
1382 mtime=template_entry.mtime,
1383 dev=template_entry.dev,
1384 ino=template_entry.ino,
1385 mode=mode,
1386 uid=template_entry.uid,
1387 gid=template_entry.gid,
1388 size=0, # Size is unknown from tree
1389 sha=sha,
1390 flags=0,
1391 extended_flags=0, # Don't copy skip-worktree flag
1392 )
1393 self._byname[full_path] = new_entry
1395 def convert_to_sparse(
1396 self,
1397 object_store: "BaseObjectStore",
1398 tree_sha: bytes,
1399 sparse_dirs: Set[bytes],
1400 ) -> None:
1401 """Convert full index entries to sparse directory entries.
1403 This collapses directories that are entirely outside the sparse
1404 checkout cone into single sparse directory entries.
1406 Args:
1407 object_store: Object store to read tree objects
1408 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on
1409 sparse_dirs: Set of directory paths (with trailing /) to collapse
1411 Raises:
1412 KeyError: If tree_sha or a subdirectory doesn't exist
1413 """
1414 if not sparse_dirs:
1415 return
1417 # Get the base tree
1418 tree = object_store[tree_sha]
1419 if not isinstance(tree, Tree):
1420 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object")
1422 # For each sparse directory, find its tree SHA and create sparse entry
1423 for dir_path in sparse_dirs:
1424 dir_path_stripped = dir_path.rstrip(b"/")
1426 # Find the tree SHA for this directory
1427 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store)
1428 if subtree_sha is None:
1429 # Directory doesn't exist in tree, skip it
1430 continue
1432 # Remove all entries under this directory
1433 entries_to_remove = [
1434 path
1435 for path in self._byname
1436 if path.startswith(dir_path) or path == dir_path_stripped
1437 ]
1438 for path in entries_to_remove:
1439 del self._byname[path]
1441 # Create a sparse directory entry
1442 # Use minimal metadata since it's not a real file
1443 sparse_entry = IndexEntry(
1444 ctime=0,
1445 mtime=0,
1446 dev=0,
1447 ino=0,
1448 mode=stat.S_IFDIR,
1449 uid=0,
1450 gid=0,
1451 size=0,
1452 sha=subtree_sha,
1453 flags=0,
1454 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
1455 )
1456 self._byname[dir_path] = sparse_entry
1458 # Add sparse directory extension if not present
1459 if not self.is_sparse():
1460 self._extensions.append(SparseDirExtension())
1462 def _find_subtree_sha(
1463 self,
1464 tree: Tree,
1465 path: bytes,
1466 object_store: "BaseObjectStore",
1467 ) -> bytes | None:
1468 """Find the SHA of a subtree at a given path.
1470 Args:
1471 tree: Root tree object to search in
1472 path: Path to the subdirectory (no trailing slash)
1473 object_store: Object store to read nested trees from
1475 Returns:
1476 SHA of the subtree, or None if path doesn't exist
1477 """
1478 if not path:
1479 return tree.id
1481 parts = path.split(b"/")
1482 current_tree = tree
1484 for part in parts:
1485 # Look for this part in the current tree
1486 try:
1487 mode, sha = current_tree[part]
1488 except KeyError:
1489 return None
1491 if not stat.S_ISDIR(mode):
1492 # Path component is a file, not a directory
1493 return None
1495 # Load the next tree
1496 obj = object_store[sha]
1497 if not isinstance(obj, Tree):
1498 return None
1499 current_tree = obj
1501 return current_tree.id
1504def commit_tree(
1505 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1506) -> bytes:
1507 """Commit a new tree.
1509 Args:
1510 object_store: Object store to add trees to
1511 blobs: Iterable over blob path, sha, mode entries
1512 Returns:
1513 SHA1 of the created tree.
1514 """
1515 trees: dict[bytes, TreeDict] = {b"": {}}
1517 def add_tree(path: bytes) -> TreeDict:
1518 if path in trees:
1519 return trees[path]
1520 dirname, basename = pathsplit(path)
1521 t = add_tree(dirname)
1522 assert isinstance(basename, bytes)
1523 newtree: TreeDict = {}
1524 t[basename] = newtree
1525 trees[path] = newtree
1526 return newtree
1528 for path, sha, mode in blobs:
1529 tree_path, basename = pathsplit(path)
1530 tree = add_tree(tree_path)
1531 tree[basename] = (mode, sha)
1533 def build_tree(path: bytes) -> bytes:
1534 tree = Tree()
1535 for basename, entry in trees[path].items():
1536 if isinstance(entry, dict):
1537 mode = stat.S_IFDIR
1538 sha = build_tree(pathjoin(path, basename))
1539 else:
1540 (mode, sha) = entry
1541 tree.add(basename, mode, sha)
1542 object_store.add_object(tree)
1543 return tree.id
1545 return build_tree(b"")
1548def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1549 """Create a new tree from an index.
1551 Args:
1552 object_store: Object store to save the tree in
1553 index: Index file
1554 Note: This function is deprecated, use index.commit() instead.
1555 Returns: Root tree sha.
1556 """
1557 return commit_tree(object_store, index.iterobjects())
1560def changes_from_tree(
1561 names: Iterable[bytes],
1562 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1563 object_store: ObjectContainer,
1564 tree: bytes | None,
1565 want_unchanged: bool = False,
1566) -> Iterable[
1567 tuple[
1568 tuple[bytes | None, bytes | None],
1569 tuple[int | None, int | None],
1570 tuple[bytes | None, bytes | None],
1571 ]
1572]:
1573 """Find the differences between the contents of a tree and a working copy.
1575 Args:
1576 names: Iterable of names in the working copy
1577 lookup_entry: Function to lookup an entry in the working copy
1578 object_store: Object store to use for retrieving tree contents
1579 tree: SHA1 of the root tree, or None for an empty tree
1580 want_unchanged: Whether unchanged files should be reported
1581 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1582 (oldsha, newsha)
1583 """
1584 # TODO(jelmer): Support a include_trees option
1585 other_names = set(names)
1587 if tree is not None:
1588 for name, mode, sha in iter_tree_contents(object_store, tree):
1589 assert name is not None and mode is not None and sha is not None
1590 try:
1591 (other_sha, other_mode) = lookup_entry(name)
1592 except KeyError:
1593 # Was removed
1594 yield ((name, None), (mode, None), (sha, None))
1595 else:
1596 other_names.remove(name)
1597 if want_unchanged or other_sha != sha or other_mode != mode:
1598 yield ((name, name), (mode, other_mode), (sha, other_sha))
1600 # Mention added files
1601 for name in other_names:
1602 try:
1603 (other_sha, other_mode) = lookup_entry(name)
1604 except KeyError:
1605 pass
1606 else:
1607 yield ((None, name), (None, other_mode), (None, other_sha))
1610def index_entry_from_stat(
1611 stat_val: os.stat_result,
1612 hex_sha: bytes,
1613 mode: int | None = None,
1614) -> IndexEntry:
1615 """Create a new index entry from a stat value.
1617 Args:
1618 stat_val: POSIX stat_result instance
1619 hex_sha: Hex sha of the object
1620 mode: Optional file mode, will be derived from stat if not provided
1621 """
1622 if mode is None:
1623 mode = cleanup_mode(stat_val.st_mode)
1625 return IndexEntry(
1626 ctime=stat_val.st_ctime,
1627 mtime=stat_val.st_mtime,
1628 dev=stat_val.st_dev,
1629 ino=stat_val.st_ino,
1630 mode=mode,
1631 uid=stat_val.st_uid,
1632 gid=stat_val.st_gid,
1633 size=stat_val.st_size,
1634 sha=hex_sha,
1635 flags=0,
1636 extended_flags=0,
1637 )
1640if sys.platform == "win32":
1641 # On Windows, creating symlinks either requires administrator privileges
1642 # or developer mode. Raise a more helpful error when we're unable to
1643 # create symlinks
1645 # https://github.com/jelmer/dulwich/issues/1005
1647 class WindowsSymlinkPermissionError(PermissionError):
1648 """Windows-specific error for symlink creation failures.
1650 This error is raised when symlink creation fails on Windows,
1651 typically due to lack of developer mode or administrator privileges.
1652 """
1654 def __init__(self, errno: int, msg: str, filename: str | None) -> None:
1655 """Initialize WindowsSymlinkPermissionError."""
1656 super(PermissionError, self).__init__(
1657 errno,
1658 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1659 filename,
1660 )
1662 def symlink(
1663 src: str | bytes,
1664 dst: str | bytes,
1665 target_is_directory: bool = False,
1666 *,
1667 dir_fd: int | None = None,
1668 ) -> None:
1669 """Create a symbolic link on Windows with better error handling.
1671 Args:
1672 src: Source path for the symlink
1673 dst: Destination path where symlink will be created
1674 target_is_directory: Whether the target is a directory
1675 dir_fd: Optional directory file descriptor
1677 Raises:
1678 WindowsSymlinkPermissionError: If symlink creation fails due to permissions
1679 """
1680 try:
1681 return os.symlink(
1682 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1683 )
1684 except PermissionError as e:
1685 raise WindowsSymlinkPermissionError(
1686 e.errno or 0, e.strerror or "", e.filename
1687 ) from e
1688else:
1689 symlink = os.symlink
1692def build_file_from_blob(
1693 blob: Blob,
1694 mode: int,
1695 target_path: bytes,
1696 *,
1697 honor_filemode: bool = True,
1698 tree_encoding: str = "utf-8",
1699 symlink_fn: Callable[
1700 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
1701 ]
1702 | None = None,
1703) -> os.stat_result:
1704 """Build a file or symlink on disk based on a Git object.
1706 Args:
1707 blob: The git object
1708 mode: File mode
1709 target_path: Path to write to
1710 honor_filemode: An optional flag to honor core.filemode setting in
1711 config file, default is core.filemode=True, change executable bit
1712 tree_encoding: Encoding to use for tree contents
1713 symlink_fn: Function to use for creating symlinks
1714 Returns: stat object for the file
1715 """
1716 try:
1717 oldstat = os.lstat(target_path)
1718 except FileNotFoundError:
1719 oldstat = None
1720 contents = blob.as_raw_string()
1721 if stat.S_ISLNK(mode):
1722 if oldstat:
1723 _remove_file_with_readonly_handling(target_path)
1724 if sys.platform == "win32":
1725 # os.readlink on Python3 on Windows requires a unicode string.
1726 contents_str = contents.decode(tree_encoding)
1727 target_path_str = target_path.decode(tree_encoding)
1728 (symlink_fn or symlink)(contents_str, target_path_str)
1729 else:
1730 (symlink_fn or symlink)(contents, target_path)
1731 else:
1732 if oldstat is not None and oldstat.st_size == len(contents):
1733 with open(target_path, "rb") as f:
1734 if f.read() == contents:
1735 return oldstat
1737 with open(target_path, "wb") as f:
1738 # Write out file
1739 f.write(contents)
1741 if honor_filemode:
1742 os.chmod(target_path, mode)
1744 return os.lstat(target_path)
1747INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1750def _normalize_path_element_default(element: bytes) -> bytes:
1751 """Normalize path element for default case-insensitive comparison."""
1752 return element.lower()
1755def _normalize_path_element_ntfs(element: bytes) -> bytes:
1756 """Normalize path element for NTFS filesystem."""
1757 return element.rstrip(b". ").lower()
1760def _normalize_path_element_hfs(element: bytes) -> bytes:
1761 """Normalize path element for HFS+ filesystem."""
1762 import unicodedata
1764 # Decode to Unicode (let UnicodeDecodeError bubble up)
1765 element_str = element.decode("utf-8", errors="strict")
1767 # Remove HFS+ ignorable characters
1768 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1769 # Normalize to NFD
1770 normalized = unicodedata.normalize("NFD", filtered)
1771 return normalized.lower().encode("utf-8", errors="strict")
1774def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
1775 """Get the appropriate path element normalization function based on config.
1777 Args:
1778 config: Repository configuration object
1780 Returns:
1781 Function that normalizes path elements for the configured filesystem
1782 """
1783 import os
1784 import sys
1786 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1787 return _normalize_path_element_ntfs
1788 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1789 return _normalize_path_element_hfs
1790 else:
1791 return _normalize_path_element_default
1794def validate_path_element_default(element: bytes) -> bool:
1795 """Validate a path element using default rules.
1797 Args:
1798 element: Path element to validate
1800 Returns:
1801 True if path element is valid, False otherwise
1802 """
1803 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1806def validate_path_element_ntfs(element: bytes) -> bool:
1807 """Validate a path element using NTFS filesystem rules.
1809 Args:
1810 element: Path element to validate
1812 Returns:
1813 True if path element is valid for NTFS, False otherwise
1814 """
1815 normalized = _normalize_path_element_ntfs(element)
1816 if normalized in INVALID_DOTNAMES:
1817 return False
1818 if normalized == b"git~1":
1819 return False
1820 return True
1823# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1824HFS_IGNORABLE_CHARS = {
1825 0x200C, # ZERO WIDTH NON-JOINER
1826 0x200D, # ZERO WIDTH JOINER
1827 0x200E, # LEFT-TO-RIGHT MARK
1828 0x200F, # RIGHT-TO-LEFT MARK
1829 0x202A, # LEFT-TO-RIGHT EMBEDDING
1830 0x202B, # RIGHT-TO-LEFT EMBEDDING
1831 0x202C, # POP DIRECTIONAL FORMATTING
1832 0x202D, # LEFT-TO-RIGHT OVERRIDE
1833 0x202E, # RIGHT-TO-LEFT OVERRIDE
1834 0x206A, # INHIBIT SYMMETRIC SWAPPING
1835 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1836 0x206C, # INHIBIT ARABIC FORM SHAPING
1837 0x206D, # ACTIVATE ARABIC FORM SHAPING
1838 0x206E, # NATIONAL DIGIT SHAPES
1839 0x206F, # NOMINAL DIGIT SHAPES
1840 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1841}
1844def validate_path_element_hfs(element: bytes) -> bool:
1845 """Validate path element for HFS+ filesystem.
1847 Equivalent to Git's is_hfs_dotgit and related checks.
1848 Uses NFD normalization and ignores HFS+ ignorable characters.
1849 """
1850 try:
1851 normalized = _normalize_path_element_hfs(element)
1852 except UnicodeDecodeError:
1853 # Malformed UTF-8 - be conservative and reject
1854 return False
1856 # Check against invalid names
1857 if normalized in INVALID_DOTNAMES:
1858 return False
1860 # Also check for 8.3 short name
1861 if normalized == b"git~1":
1862 return False
1864 return True
1867def validate_path(
1868 path: bytes,
1869 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1870) -> bool:
1871 """Default path validator that just checks for .git/."""
1872 parts = path.split(b"/")
1873 for p in parts:
1874 if not element_validator(p):
1875 return False
1876 else:
1877 return True
1880def build_index_from_tree(
1881 root_path: str | bytes,
1882 index_path: str | bytes,
1883 object_store: ObjectContainer,
1884 tree_id: bytes,
1885 honor_filemode: bool = True,
1886 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1887 symlink_fn: Callable[
1888 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
1889 ]
1890 | None = None,
1891 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
1892 tree_encoding: str = "utf-8",
1893) -> None:
1894 """Generate and materialize index from a tree.
1896 Args:
1897 tree_id: Tree to materialize
1898 root_path: Target dir for materialized index files
1899 index_path: Target path for generated index
1900 object_store: Non-empty object store holding tree contents
1901 honor_filemode: An optional flag to honor core.filemode setting in
1902 config file, default is core.filemode=True, change executable bit
1903 validate_path_element: Function to validate path elements to check
1904 out; default just refuses .git and .. directories.
1905 symlink_fn: Function to use for creating symlinks
1906 blob_normalizer: An optional BlobNormalizer to use for converting line
1907 endings when writing blobs to the working directory.
1908 tree_encoding: Encoding used for tree paths (default: utf-8)
1910 Note: existing index is wiped and contents are not merged
1911 in a working dir. Suitable only for fresh clones.
1912 """
1913 index = Index(index_path, read=False)
1914 if not isinstance(root_path, bytes):
1915 root_path = os.fsencode(root_path)
1917 for entry in iter_tree_contents(object_store, tree_id):
1918 assert (
1919 entry.path is not None and entry.mode is not None and entry.sha is not None
1920 )
1921 if not validate_path(entry.path, validate_path_element):
1922 continue
1923 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
1925 if not os.path.exists(os.path.dirname(full_path)):
1926 os.makedirs(os.path.dirname(full_path))
1928 # TODO(jelmer): Merge new index into working tree
1929 if S_ISGITLINK(entry.mode):
1930 if not os.path.isdir(full_path):
1931 os.mkdir(full_path)
1932 st = os.lstat(full_path)
1933 # TODO(jelmer): record and return submodule paths
1934 else:
1935 obj = object_store[entry.sha]
1936 assert isinstance(obj, Blob)
1937 # Apply blob normalization for checkout if normalizer is provided
1938 if blob_normalizer is not None:
1939 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1940 st = build_file_from_blob(
1941 obj,
1942 entry.mode,
1943 full_path,
1944 honor_filemode=honor_filemode,
1945 tree_encoding=tree_encoding,
1946 symlink_fn=symlink_fn,
1947 )
1949 # Add file to index
1950 if not honor_filemode or S_ISGITLINK(entry.mode):
1951 # we can not use tuple slicing to build a new tuple,
1952 # because on windows that will convert the times to
1953 # longs, which causes errors further along
1954 st_tuple = (
1955 entry.mode,
1956 st.st_ino,
1957 st.st_dev,
1958 st.st_nlink,
1959 st.st_uid,
1960 st.st_gid,
1961 st.st_size,
1962 st.st_atime,
1963 st.st_mtime,
1964 st.st_ctime,
1965 )
1966 st = st.__class__(st_tuple)
1967 # default to a stage 0 index entry (normal)
1968 # when reading from the filesystem
1969 index[entry.path] = index_entry_from_stat(st, entry.sha)
1971 index.write()
1974def blob_from_path_and_mode(
1975 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1976) -> Blob:
1977 """Create a blob from a path and a stat object.
1979 Args:
1980 fs_path: Full file system path to file
1981 mode: File mode
1982 tree_encoding: Encoding to use for tree contents
1983 Returns: A `Blob` object
1984 """
1985 assert isinstance(fs_path, bytes)
1986 blob = Blob()
1987 if stat.S_ISLNK(mode):
1988 if sys.platform == "win32":
1989 # os.readlink on Python3 on Windows requires a unicode string.
1990 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1991 else:
1992 blob.data = os.readlink(fs_path)
1993 else:
1994 with open(fs_path, "rb") as f:
1995 blob.data = f.read()
1996 return blob
1999def blob_from_path_and_stat(
2000 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
2001) -> Blob:
2002 """Create a blob from a path and a stat object.
2004 Args:
2005 fs_path: Full file system path to file
2006 st: A stat object
2007 tree_encoding: Encoding to use for tree contents
2008 Returns: A `Blob` object
2009 """
2010 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
2013def read_submodule_head(path: str | bytes) -> bytes | None:
2014 """Read the head commit of a submodule.
2016 Args:
2017 path: path to the submodule
2018 Returns: HEAD sha, None if not a valid head/repository
2019 """
2020 from .errors import NotGitRepository
2021 from .repo import Repo
2023 # Repo currently expects a "str", so decode if necessary.
2024 # TODO(jelmer): Perhaps move this into Repo() ?
2025 if not isinstance(path, str):
2026 path = os.fsdecode(path)
2027 try:
2028 repo = Repo(path)
2029 except NotGitRepository:
2030 return None
2031 try:
2032 return repo.head()
2033 except KeyError:
2034 return None
2037def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
2038 """Check if a directory has changed after getting an error.
2040 When handling an error trying to create a blob from a path, call this
2041 function. It will check if the path is a directory. If it's a directory
2042 and a submodule, check the submodule head to see if it's has changed. If
2043 not, consider the file as changed as Git tracked a file and not a
2044 directory.
2046 Return true if the given path should be considered as changed and False
2047 otherwise or if the path is not a directory.
2048 """
2049 # This is actually a directory
2050 if os.path.exists(os.path.join(tree_path, b".git")):
2051 # Submodule
2052 head = read_submodule_head(tree_path)
2053 if entry.sha != head:
2054 return True
2055 else:
2056 # The file was changed to a directory, so consider it removed.
2057 return True
2059 return False
2062os_sep_bytes = os.sep.encode("ascii")
2065def _ensure_parent_dir_exists(full_path: bytes) -> None:
2066 """Ensure parent directory exists, checking no parent is a file."""
2067 parent_dir = os.path.dirname(full_path)
2068 if parent_dir and not os.path.exists(parent_dir):
2069 # Walk up the directory tree to find the first existing parent
2070 current = parent_dir
2071 parents_to_check: list[bytes] = []
2073 while current and not os.path.exists(current):
2074 parents_to_check.insert(0, current)
2075 new_parent = os.path.dirname(current)
2076 if new_parent == current:
2077 # Reached the root or can't go up further
2078 break
2079 current = new_parent
2081 # Check if the existing parent (if any) is a directory
2082 if current and os.path.exists(current) and not os.path.isdir(current):
2083 raise OSError(
2084 f"Cannot create directory, parent path is a file: {current!r}"
2085 )
2087 # Now check each parent we need to create isn't blocked by an existing file
2088 for parent_path in parents_to_check:
2089 if os.path.exists(parent_path) and not os.path.isdir(parent_path):
2090 raise OSError(
2091 f"Cannot create directory, parent path is a file: {parent_path!r}"
2092 )
2094 os.makedirs(parent_dir)
2097def _remove_file_with_readonly_handling(path: bytes) -> None:
2098 """Remove a file, handling read-only files on Windows.
2100 Args:
2101 path: Path to the file to remove
2102 """
2103 try:
2104 os.unlink(path)
2105 except PermissionError:
2106 # On Windows, remove read-only attribute and retry
2107 if sys.platform == "win32":
2108 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
2109 os.unlink(path)
2110 else:
2111 raise
2114def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
2115 """Remove empty parent directories up to stop_at."""
2116 parent = os.path.dirname(path)
2117 while parent and parent != stop_at:
2118 try:
2119 os.rmdir(parent)
2120 parent = os.path.dirname(parent)
2121 except FileNotFoundError:
2122 # Directory doesn't exist - stop trying
2123 break
2124 except OSError as e:
2125 if e.errno == errno.ENOTEMPTY:
2126 # Directory not empty - stop trying
2127 break
2128 raise
2131def _check_symlink_matches(
2132 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes
2133) -> bool:
2134 """Check if symlink target matches expected target.
2136 Returns True if symlink matches, False if it doesn't match.
2137 """
2138 try:
2139 current_target = os.readlink(full_path)
2140 blob_obj = repo_object_store[entry_sha]
2141 expected_target = blob_obj.as_raw_string()
2142 if isinstance(current_target, str):
2143 current_target = current_target.encode()
2144 return current_target == expected_target
2145 except FileNotFoundError:
2146 # Symlink doesn't exist
2147 return False
2148 except OSError as e:
2149 if e.errno == errno.EINVAL:
2150 # Not a symlink
2151 return False
2152 raise
2155def _check_file_matches(
2156 repo_object_store: "BaseObjectStore",
2157 full_path: bytes,
2158 entry_sha: bytes,
2159 entry_mode: int,
2160 current_stat: os.stat_result,
2161 honor_filemode: bool,
2162 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
2163 tree_path: bytes | None = None,
2164) -> bool:
2165 """Check if a file on disk matches the expected git object.
2167 Returns True if file matches, False if it doesn't match.
2168 """
2169 # Check mode first (if honor_filemode is True)
2170 if honor_filemode:
2171 current_mode = stat.S_IMODE(current_stat.st_mode)
2172 expected_mode = stat.S_IMODE(entry_mode)
2174 # For regular files, only check the user executable bit, not group/other permissions
2175 # This matches Git's behavior where umask differences don't count as modifications
2176 if stat.S_ISREG(current_stat.st_mode):
2177 # Normalize regular file modes to ignore group/other write permissions
2178 current_mode_normalized = (
2179 current_mode & 0o755
2180 ) # Keep only user rwx and all read+execute
2181 expected_mode_normalized = expected_mode & 0o755
2183 # For Git compatibility, regular files should be either 644 or 755
2184 if expected_mode_normalized not in (0o644, 0o755):
2185 expected_mode_normalized = 0o644 # Default for regular files
2186 if current_mode_normalized not in (0o644, 0o755):
2187 # Determine if it should be executable based on user execute bit
2188 if current_mode & 0o100: # User execute bit is set
2189 current_mode_normalized = 0o755
2190 else:
2191 current_mode_normalized = 0o644
2193 if current_mode_normalized != expected_mode_normalized:
2194 return False
2195 else:
2196 # For non-regular files (symlinks, etc.), check mode exactly
2197 if current_mode != expected_mode:
2198 return False
2200 # If mode matches (or we don't care), check content via size first
2201 blob_obj = repo_object_store[entry_sha]
2202 if current_stat.st_size != blob_obj.raw_length():
2203 return False
2205 # Size matches, check actual content
2206 try:
2207 with open(full_path, "rb") as f:
2208 current_content = f.read()
2209 expected_content = blob_obj.as_raw_string()
2210 if blob_normalizer and tree_path is not None:
2211 assert isinstance(blob_obj, Blob)
2212 normalized_blob = blob_normalizer.checkout_normalize(
2213 blob_obj, tree_path
2214 )
2215 expected_content = normalized_blob.as_raw_string()
2216 return current_content == expected_content
2217 except (FileNotFoundError, PermissionError, IsADirectoryError):
2218 return False
2221def _transition_to_submodule(
2222 repo: "Repo",
2223 path: bytes,
2224 full_path: bytes,
2225 current_stat: os.stat_result | None,
2226 entry: IndexEntry | TreeEntry,
2227 index: Index,
2228) -> None:
2229 """Transition any type to submodule."""
2230 from .submodule import ensure_submodule_placeholder
2232 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2233 # Already a directory, just ensure .git file exists
2234 ensure_submodule_placeholder(repo, path)
2235 else:
2236 # Remove whatever is there and create submodule
2237 if current_stat is not None:
2238 _remove_file_with_readonly_handling(full_path)
2239 ensure_submodule_placeholder(repo, path)
2241 st = os.lstat(full_path)
2242 assert entry.sha is not None
2243 index[path] = index_entry_from_stat(st, entry.sha)
2246def _transition_to_file(
2247 object_store: "BaseObjectStore",
2248 path: bytes,
2249 full_path: bytes,
2250 current_stat: os.stat_result | None,
2251 entry: IndexEntry | TreeEntry,
2252 index: Index,
2253 honor_filemode: bool,
2254 symlink_fn: Callable[
2255 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2256 ]
2257 | None,
2258 blob_normalizer: Optional["FilterBlobNormalizer"],
2259 tree_encoding: str = "utf-8",
2260) -> None:
2261 """Transition any type to regular file or symlink."""
2262 assert entry.sha is not None and entry.mode is not None
2263 # Check if we need to update
2264 if (
2265 current_stat is not None
2266 and stat.S_ISREG(current_stat.st_mode)
2267 and not stat.S_ISLNK(entry.mode)
2268 ):
2269 # File to file - check if update needed
2270 file_matches = _check_file_matches(
2271 object_store,
2272 full_path,
2273 entry.sha,
2274 entry.mode,
2275 current_stat,
2276 honor_filemode,
2277 blob_normalizer,
2278 path,
2279 )
2280 needs_update = not file_matches
2281 elif (
2282 current_stat is not None
2283 and stat.S_ISLNK(current_stat.st_mode)
2284 and stat.S_ISLNK(entry.mode)
2285 ):
2286 # Symlink to symlink - check if update needed
2287 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
2288 needs_update = not symlink_matches
2289 else:
2290 needs_update = True
2292 if not needs_update:
2293 # Just update index - current_stat should always be valid here since we're not updating
2294 assert current_stat is not None
2295 index[path] = index_entry_from_stat(current_stat, entry.sha)
2296 return
2298 # Remove existing entry if needed
2299 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2300 # Remove directory
2301 dir_contents = set(os.listdir(full_path))
2302 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2304 if git_file_name in dir_contents:
2305 if dir_contents != {git_file_name}:
2306 raise IsADirectoryError(
2307 f"Cannot replace submodule with untracked files: {full_path!r}"
2308 )
2309 shutil.rmtree(full_path)
2310 else:
2311 try:
2312 os.rmdir(full_path)
2313 except OSError as e:
2314 if e.errno == errno.ENOTEMPTY:
2315 raise IsADirectoryError(
2316 f"Cannot replace non-empty directory with file: {full_path!r}"
2317 )
2318 raise
2319 elif current_stat is not None:
2320 _remove_file_with_readonly_handling(full_path)
2322 # Ensure parent directory exists
2323 _ensure_parent_dir_exists(full_path)
2325 # Write the file
2326 blob_obj = object_store[entry.sha]
2327 assert isinstance(blob_obj, Blob)
2328 if blob_normalizer:
2329 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
2330 st = build_file_from_blob(
2331 blob_obj,
2332 entry.mode,
2333 full_path,
2334 honor_filemode=honor_filemode,
2335 tree_encoding=tree_encoding,
2336 symlink_fn=symlink_fn,
2337 )
2338 index[path] = index_entry_from_stat(st, entry.sha)
2341def _transition_to_absent(
2342 repo: "Repo",
2343 path: bytes,
2344 full_path: bytes,
2345 current_stat: os.stat_result | None,
2346 index: Index,
2347) -> None:
2348 """Remove any type of entry."""
2349 if current_stat is None:
2350 return
2352 if stat.S_ISDIR(current_stat.st_mode):
2353 # Check if it's a submodule directory
2354 dir_contents = set(os.listdir(full_path))
2355 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2357 if git_file_name in dir_contents and dir_contents == {git_file_name}:
2358 shutil.rmtree(full_path)
2359 else:
2360 try:
2361 os.rmdir(full_path)
2362 except OSError as e:
2363 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
2364 raise
2365 else:
2366 _remove_file_with_readonly_handling(full_path)
2368 try:
2369 del index[path]
2370 except KeyError:
2371 pass
2373 # Try to remove empty parent directories
2374 _remove_empty_parents(
2375 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2376 )
2379def detect_case_only_renames(
2380 changes: Sequence["TreeChange"],
2381 config: "Config",
2382) -> list["TreeChange"]:
2383 """Detect and transform case-only renames in a list of tree changes.
2385 This function identifies file renames that only differ in case (e.g.,
2386 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
2387 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
2388 based on the repository configuration.
2390 Args:
2391 changes: List of TreeChange objects representing file changes
2392 config: Repository configuration object
2394 Returns:
2395 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
2396 """
2397 from .diff_tree import (
2398 CHANGE_ADD,
2399 CHANGE_COPY,
2400 CHANGE_DELETE,
2401 CHANGE_MODIFY,
2402 CHANGE_RENAME,
2403 TreeChange,
2404 )
2406 # Build dictionaries of old and new paths with their normalized forms
2407 old_paths_normalized = {}
2408 new_paths_normalized = {}
2409 old_changes = {} # Map from old path to change object
2410 new_changes = {} # Map from new path to change object
2412 # Get the appropriate normalizer based on config
2413 normalize_func = get_path_element_normalizer(config)
2415 def normalize_path(path: bytes) -> bytes:
2416 """Normalize entire path using element normalization."""
2417 return b"/".join(normalize_func(part) for part in path.split(b"/"))
2419 # Pre-normalize all paths once to avoid repeated normalization
2420 for change in changes:
2421 if change.type == CHANGE_DELETE and change.old:
2422 assert change.old.path is not None
2423 try:
2424 normalized = normalize_path(change.old.path)
2425 except UnicodeDecodeError:
2426 import logging
2428 logging.warning(
2429 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2430 change.old.path,
2431 )
2432 else:
2433 old_paths_normalized[normalized] = change.old.path
2434 old_changes[change.old.path] = change
2435 elif change.type == CHANGE_RENAME and change.old:
2436 assert change.old.path is not None
2437 # Treat RENAME as DELETE + ADD for case-only detection
2438 try:
2439 normalized = normalize_path(change.old.path)
2440 except UnicodeDecodeError:
2441 import logging
2443 logging.warning(
2444 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2445 change.old.path,
2446 )
2447 else:
2448 old_paths_normalized[normalized] = change.old.path
2449 old_changes[change.old.path] = change
2451 if (
2452 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
2453 and change.new
2454 ):
2455 assert change.new.path is not None
2456 try:
2457 normalized = normalize_path(change.new.path)
2458 except UnicodeDecodeError:
2459 import logging
2461 logging.warning(
2462 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2463 change.new.path,
2464 )
2465 else:
2466 new_paths_normalized[normalized] = change.new.path
2467 new_changes[change.new.path] = change
2469 # Find case-only renames and transform changes
2470 case_only_renames = set()
2471 new_rename_changes = []
2473 for norm_path, old_path in old_paths_normalized.items():
2474 if norm_path in new_paths_normalized:
2475 new_path = new_paths_normalized[norm_path]
2476 if old_path != new_path:
2477 # Found a case-only rename
2478 old_change = old_changes[old_path]
2479 new_change = new_changes[new_path]
2481 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
2482 if new_change.type == CHANGE_ADD:
2483 # Simple case: DELETE + ADD becomes RENAME
2484 rename_change = TreeChange(
2485 CHANGE_RENAME, old_change.old, new_change.new
2486 )
2487 else:
2488 # Complex case: DELETE + MODIFY becomes RENAME
2489 # Use the old file from DELETE and new file from MODIFY
2490 rename_change = TreeChange(
2491 CHANGE_RENAME, old_change.old, new_change.new
2492 )
2494 new_rename_changes.append(rename_change)
2496 # Mark the old changes for removal
2497 case_only_renames.add(old_change)
2498 case_only_renames.add(new_change)
2500 # Return new list with original ADD/DELETE changes replaced by renames
2501 result = [change for change in changes if change not in case_only_renames]
2502 result.extend(new_rename_changes)
2503 return result
2506def update_working_tree(
2507 repo: "Repo",
2508 old_tree_id: bytes | None,
2509 new_tree_id: bytes,
2510 change_iterator: Iterator["TreeChange"],
2511 honor_filemode: bool = True,
2512 validate_path_element: Callable[[bytes], bool] | None = None,
2513 symlink_fn: Callable[
2514 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2515 ]
2516 | None = None,
2517 force_remove_untracked: bool = False,
2518 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
2519 tree_encoding: str = "utf-8",
2520 allow_overwrite_modified: bool = False,
2521) -> None:
2522 """Update the working tree and index to match a new tree.
2524 This function handles:
2525 - Adding new files
2526 - Updating modified files
2527 - Removing deleted files
2528 - Cleaning up empty directories
2530 Args:
2531 repo: Repository object
2532 old_tree_id: SHA of the tree before the update
2533 new_tree_id: SHA of the tree to update to
2534 change_iterator: Iterator of TreeChange objects to apply
2535 honor_filemode: An optional flag to honor core.filemode setting
2536 validate_path_element: Function to validate path elements to check out
2537 symlink_fn: Function to use for creating symlinks
2538 force_remove_untracked: If True, remove files that exist in working
2539 directory but not in target tree, even if old_tree_id is None
2540 blob_normalizer: An optional BlobNormalizer to use for converting line
2541 endings when writing blobs to the working directory.
2542 tree_encoding: Encoding used for tree paths (default: utf-8)
2543 allow_overwrite_modified: If False, raise an error when attempting to
2544 overwrite files that have been modified compared to old_tree_id
2545 """
2546 if validate_path_element is None:
2547 validate_path_element = validate_path_element_default
2549 from .diff_tree import (
2550 CHANGE_ADD,
2551 CHANGE_COPY,
2552 CHANGE_DELETE,
2553 CHANGE_MODIFY,
2554 CHANGE_RENAME,
2555 CHANGE_UNCHANGED,
2556 )
2558 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2559 index = repo.open_index()
2561 # Convert iterator to list since we need multiple passes
2562 changes = list(change_iterator)
2564 # Transform case-only renames on case-insensitive filesystems
2565 import platform
2567 default_ignore_case = platform.system() in ("Windows", "Darwin")
2568 config = repo.get_config()
2569 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2571 if ignore_case:
2572 config = repo.get_config()
2573 changes = detect_case_only_renames(changes, config)
2575 # Check for path conflicts where files need to become directories
2576 paths_becoming_dirs = set()
2577 for change in changes:
2578 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2579 assert change.new is not None
2580 path = change.new.path
2581 assert path is not None
2582 if b"/" in path: # This is a file inside a directory
2583 # Check if any parent path exists as a file in the old tree or changes
2584 parts = path.split(b"/")
2585 for i in range(1, len(parts)):
2586 parent = b"/".join(parts[:i])
2587 # See if this parent path is being deleted (was a file, becoming a dir)
2588 for other_change in changes:
2589 if (
2590 other_change.type == CHANGE_DELETE
2591 and other_change.old
2592 and other_change.old.path == parent
2593 ):
2594 paths_becoming_dirs.add(parent)
2596 # Check if any path that needs to become a directory has been modified
2597 for path in paths_becoming_dirs:
2598 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2599 try:
2600 current_stat = os.lstat(full_path)
2601 except FileNotFoundError:
2602 continue # File doesn't exist, nothing to check
2603 except OSError as e:
2604 raise OSError(
2605 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2606 ) from e
2608 if stat.S_ISREG(current_stat.st_mode):
2609 # Find the old entry for this path
2610 old_change = None
2611 for change in changes:
2612 if (
2613 change.type == CHANGE_DELETE
2614 and change.old
2615 and change.old.path == path
2616 ):
2617 old_change = change
2618 break
2620 if old_change:
2621 # Check if file has been modified
2622 assert old_change.old is not None
2623 assert (
2624 old_change.old.sha is not None and old_change.old.mode is not None
2625 )
2626 file_matches = _check_file_matches(
2627 repo.object_store,
2628 full_path,
2629 old_change.old.sha,
2630 old_change.old.mode,
2631 current_stat,
2632 honor_filemode,
2633 blob_normalizer,
2634 path,
2635 )
2636 if not file_matches:
2637 raise OSError(
2638 f"Cannot replace modified file with directory: {path!r}"
2639 )
2641 # Check for uncommitted modifications before making any changes
2642 if not allow_overwrite_modified and old_tree_id:
2643 for change in changes:
2644 # Only check files that are being modified or deleted
2645 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2646 path = change.old.path
2647 assert path is not None
2648 if path.startswith(b".git") or not validate_path(
2649 path, validate_path_element
2650 ):
2651 continue
2653 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2654 try:
2655 current_stat = os.lstat(full_path)
2656 except FileNotFoundError:
2657 continue # File doesn't exist, nothing to check
2658 except OSError as e:
2659 raise OSError(
2660 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2661 ) from e
2663 if stat.S_ISREG(current_stat.st_mode):
2664 # Check if working tree file differs from old tree
2665 assert change.old.sha is not None and change.old.mode is not None
2666 file_matches = _check_file_matches(
2667 repo.object_store,
2668 full_path,
2669 change.old.sha,
2670 change.old.mode,
2671 current_stat,
2672 honor_filemode,
2673 blob_normalizer,
2674 path,
2675 )
2676 if not file_matches:
2677 from .errors import WorkingTreeModifiedError
2679 raise WorkingTreeModifiedError(
2680 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2681 f"would be overwritten by checkout. "
2682 f"Please commit your changes or stash them before you switch branches."
2683 )
2685 # Apply the changes
2686 for change in changes:
2687 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2688 # Remove file/directory
2689 assert change.old is not None and change.old.path is not None
2690 path = change.old.path
2691 if path.startswith(b".git") or not validate_path(
2692 path, validate_path_element
2693 ):
2694 continue
2696 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2697 try:
2698 delete_stat: os.stat_result | None = os.lstat(full_path)
2699 except FileNotFoundError:
2700 delete_stat = None
2701 except OSError as e:
2702 raise OSError(
2703 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2704 ) from e
2706 _transition_to_absent(repo, path, full_path, delete_stat, index)
2708 if change.type in (
2709 CHANGE_ADD,
2710 CHANGE_MODIFY,
2711 CHANGE_UNCHANGED,
2712 CHANGE_COPY,
2713 CHANGE_RENAME,
2714 ):
2715 # Add or modify file
2716 assert (
2717 change.new is not None
2718 and change.new.path is not None
2719 and change.new.mode is not None
2720 )
2721 path = change.new.path
2722 if path.startswith(b".git") or not validate_path(
2723 path, validate_path_element
2724 ):
2725 continue
2727 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2728 try:
2729 modify_stat: os.stat_result | None = os.lstat(full_path)
2730 except FileNotFoundError:
2731 modify_stat = None
2732 except OSError as e:
2733 raise OSError(
2734 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2735 ) from e
2737 if S_ISGITLINK(change.new.mode):
2738 _transition_to_submodule(
2739 repo, path, full_path, modify_stat, change.new, index
2740 )
2741 else:
2742 _transition_to_file(
2743 repo.object_store,
2744 path,
2745 full_path,
2746 modify_stat,
2747 change.new,
2748 index,
2749 honor_filemode,
2750 symlink_fn,
2751 blob_normalizer,
2752 tree_encoding,
2753 )
2755 index.write()
2758def _check_entry_for_changes(
2759 tree_path: bytes,
2760 entry: IndexEntry | ConflictedIndexEntry,
2761 root_path: bytes,
2762 filter_blob_callback: Callable[[bytes, bytes], bytes] | None = None,
2763) -> bytes | None:
2764 """Check a single index entry for changes.
2766 Args:
2767 tree_path: Path in the tree
2768 entry: Index entry to check
2769 root_path: Root filesystem path
2770 filter_blob_callback: Optional callback to filter blobs
2771 Returns: tree_path if changed, None otherwise
2772 """
2773 if isinstance(entry, ConflictedIndexEntry):
2774 # Conflicted files are always unstaged
2775 return tree_path
2777 full_path = _tree_to_fs_path(root_path, tree_path)
2778 try:
2779 st = os.lstat(full_path)
2780 if stat.S_ISDIR(st.st_mode):
2781 if _has_directory_changed(tree_path, entry):
2782 return tree_path
2783 return None
2785 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2786 return None
2788 blob = blob_from_path_and_stat(full_path, st)
2790 if filter_blob_callback is not None:
2791 blob.data = filter_blob_callback(blob.data, tree_path)
2792 except FileNotFoundError:
2793 # The file was removed, so we assume that counts as
2794 # different from whatever file used to exist.
2795 return tree_path
2796 else:
2797 if blob.id != entry.sha:
2798 return tree_path
2799 return None
2802def get_unstaged_changes(
2803 index: Index,
2804 root_path: str | bytes,
2805 filter_blob_callback: Callable[..., Any] | None = None,
2806 preload_index: bool = False,
2807) -> Generator[bytes, None, None]:
2808 """Walk through an index and check for differences against working tree.
2810 Args:
2811 index: index to check
2812 root_path: path in which to find files
2813 filter_blob_callback: Optional callback to filter blobs
2814 preload_index: If True, use parallel threads to check files (requires threading support)
2815 Returns: iterator over paths with unstaged changes
2816 """
2817 # For each entry in the index check the sha1 & ensure not staged
2818 if not isinstance(root_path, bytes):
2819 root_path = os.fsencode(root_path)
2821 if preload_index:
2822 # Use parallel processing for better performance on slow filesystems
2823 try:
2824 import multiprocessing
2825 from concurrent.futures import ThreadPoolExecutor
2826 except ImportError:
2827 # If threading is not available, fall back to serial processing
2828 preload_index = False
2829 else:
2830 # Collect all entries first
2831 entries = list(index.iteritems())
2833 # Use number of CPUs but cap at 8 threads to avoid overhead
2834 num_workers = min(multiprocessing.cpu_count(), 8)
2836 # Process entries in parallel
2837 with ThreadPoolExecutor(max_workers=num_workers) as executor:
2838 # Submit all tasks
2839 futures = [
2840 executor.submit(
2841 _check_entry_for_changes,
2842 tree_path,
2843 entry,
2844 root_path,
2845 filter_blob_callback,
2846 )
2847 for tree_path, entry in entries
2848 ]
2850 # Yield results as they complete
2851 for future in futures:
2852 result = future.result()
2853 if result is not None:
2854 yield result
2856 if not preload_index:
2857 # Serial processing
2858 for tree_path, entry in index.iteritems():
2859 result = _check_entry_for_changes(
2860 tree_path, entry, root_path, filter_blob_callback
2861 )
2862 if result is not None:
2863 yield result
2866def _tree_to_fs_path(
2867 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
2868) -> bytes:
2869 """Convert a git tree path to a file system path.
2871 Args:
2872 root_path: Root filesystem path
2873 tree_path: Git tree path as bytes (encoded with tree_encoding)
2874 tree_encoding: Encoding used for tree paths (default: utf-8)
2876 Returns: File system path.
2877 """
2878 assert isinstance(tree_path, bytes)
2879 if os_sep_bytes != b"/":
2880 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
2881 else:
2882 sep_corrected_path = tree_path
2884 # On Windows, we need to handle tree path encoding properly
2885 if sys.platform == "win32":
2886 # Decode from tree encoding, then re-encode for filesystem
2887 try:
2888 tree_path_str = sep_corrected_path.decode(tree_encoding)
2889 sep_corrected_path = os.fsencode(tree_path_str)
2890 except UnicodeDecodeError:
2891 # If decoding fails, use the original bytes
2892 pass
2894 return os.path.join(root_path, sep_corrected_path)
2897def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes:
2898 """Convert a file system path to a git tree path.
2900 Args:
2901 fs_path: File system path.
2902 tree_encoding: Encoding to use for tree paths (default: utf-8)
2904 Returns: Git tree path as bytes (encoded with tree_encoding)
2905 """
2906 if not isinstance(fs_path, bytes):
2907 fs_path_bytes = os.fsencode(fs_path)
2908 else:
2909 fs_path_bytes = fs_path
2911 # On Windows, we need to ensure tree paths are properly encoded
2912 if sys.platform == "win32":
2913 try:
2914 # Decode from filesystem encoding, then re-encode with tree encoding
2915 fs_path_str = os.fsdecode(fs_path_bytes)
2916 fs_path_bytes = fs_path_str.encode(tree_encoding)
2917 except UnicodeDecodeError:
2918 # If filesystem decoding fails, use the original bytes
2919 pass
2921 if os_sep_bytes != b"/":
2922 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2923 else:
2924 tree_path = fs_path_bytes
2925 return tree_path
2928def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None:
2929 """Create an index entry for a directory.
2931 This is only used for submodules (directories containing .git).
2933 Args:
2934 st: Stat result for the directory
2935 path: Path to the directory
2937 Returns:
2938 IndexEntry for a submodule, or None if not a submodule
2939 """
2940 if os.path.exists(os.path.join(path, b".git")):
2941 head = read_submodule_head(path)
2942 if head is None:
2943 return None
2944 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2945 return None
2948def index_entry_from_path(
2949 path: bytes, object_store: ObjectContainer | None = None
2950) -> IndexEntry | None:
2951 """Create an index from a filesystem path.
2953 This returns an index value for files, symlinks
2954 and tree references. for directories and
2955 non-existent files it returns None
2957 Args:
2958 path: Path to create an index entry for
2959 object_store: Optional object store to
2960 save new blobs in
2961 Returns: An index entry; None for directories
2962 """
2963 assert isinstance(path, bytes)
2964 st = os.lstat(path)
2965 if stat.S_ISDIR(st.st_mode):
2966 return index_entry_from_directory(st, path)
2968 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
2969 blob = blob_from_path_and_stat(path, st)
2970 if object_store is not None:
2971 object_store.add_object(blob)
2972 return index_entry_from_stat(st, blob.id)
2974 return None
2977def iter_fresh_entries(
2978 paths: Iterable[bytes],
2979 root_path: bytes,
2980 object_store: ObjectContainer | None = None,
2981) -> Iterator[tuple[bytes, IndexEntry | None]]:
2982 """Iterate over current versions of index entries on disk.
2984 Args:
2985 paths: Paths to iterate over
2986 root_path: Root path to access from
2987 object_store: Optional store to save new blobs in
2988 Returns: Iterator over path, index_entry
2989 """
2990 for path in paths:
2991 p = _tree_to_fs_path(root_path, path)
2992 try:
2993 entry = index_entry_from_path(p, object_store=object_store)
2994 except (FileNotFoundError, IsADirectoryError):
2995 entry = None
2996 yield path, entry
2999def iter_fresh_objects(
3000 paths: Iterable[bytes],
3001 root_path: bytes,
3002 include_deleted: bool = False,
3003 object_store: ObjectContainer | None = None,
3004) -> Iterator[tuple[bytes, bytes | None, int | None]]:
3005 """Iterate over versions of objects on disk referenced by index.
3007 Args:
3008 paths: Paths to check
3009 root_path: Root path to access from
3010 include_deleted: Include deleted entries with sha and
3011 mode set to None
3012 object_store: Optional object store to report new items to
3013 Returns: Iterator over path, sha, mode
3014 """
3015 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
3016 if entry is None:
3017 if include_deleted:
3018 yield path, None, None
3019 else:
3020 yield path, entry.sha, cleanup_mode(entry.mode)
3023def refresh_index(index: Index, root_path: bytes) -> None:
3024 """Refresh the contents of an index.
3026 This is the equivalent to running 'git commit -a'.
3028 Args:
3029 index: Index to update
3030 root_path: Root filesystem path
3031 """
3032 for path, entry in iter_fresh_entries(index, root_path):
3033 if entry:
3034 index[path] = entry
3037class locked_index:
3038 """Lock the index while making modifications.
3040 Works as a context manager.
3041 """
3043 _file: "_GitFile"
3045 def __init__(self, path: bytes | str) -> None:
3046 """Initialize locked_index."""
3047 self._path = path
3049 def __enter__(self) -> Index:
3050 """Enter context manager and lock index."""
3051 f = GitFile(self._path, "wb")
3052 self._file = f
3053 self._index = Index(self._path)
3054 return self._index
3056 def __exit__(
3057 self,
3058 exc_type: type | None,
3059 exc_value: BaseException | None,
3060 traceback: types.TracebackType | None,
3061 ) -> None:
3062 """Exit context manager and unlock index."""
3063 if exc_type is not None:
3064 self._file.abort()
3065 return
3066 try:
3067 f = SHA1Writer(self._file)
3068 write_index_dict(f, self._index._byname)
3069 except BaseException:
3070 self._file.abort()
3071 else:
3072 f.close()