Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence, Set
32from dataclasses import dataclass
33from enum import Enum
34from typing import (
35 IO,
36 TYPE_CHECKING,
37 Any,
38 BinaryIO,
39 Callable,
40 Optional,
41 Union,
42)
44if TYPE_CHECKING:
45 from .config import Config
46 from .diff_tree import TreeChange
47 from .file import _GitFile
48 from .filters import FilterBlobNormalizer
49 from .object_store import BaseObjectStore
50 from .repo import Repo
52from .file import GitFile
53from .object_store import iter_tree_contents
54from .objects import (
55 S_IFGITLINK,
56 S_ISGITLINK,
57 Blob,
58 ObjectID,
59 Tree,
60 TreeEntry,
61 hex_to_sha,
62 sha_to_hex,
63)
64from .pack import ObjectContainer, SHA1Reader, SHA1Writer
66# Type alias for recursive tree structure used in commit_tree
67if sys.version_info >= (3, 10):
68 TreeDict = dict[bytes, Union["TreeDict", tuple[int, bytes]]]
69else:
70 TreeDict = dict[bytes, Any]
72# 2-bit stage (during merge)
73FLAG_STAGEMASK = 0x3000
74FLAG_STAGESHIFT = 12
75FLAG_NAMEMASK = 0x0FFF
77# assume-valid
78FLAG_VALID = 0x8000
80# extended flag (must be zero in version 2)
81FLAG_EXTENDED = 0x4000
83# used by sparse checkout
84EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
86# used by "git add -N"
87EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
89DEFAULT_VERSION = 2
91# Index extension signatures
92TREE_EXTENSION = b"TREE"
93REUC_EXTENSION = b"REUC"
94UNTR_EXTENSION = b"UNTR"
95EOIE_EXTENSION = b"EOIE"
96IEOT_EXTENSION = b"IEOT"
97SDIR_EXTENSION = b"sdir" # Sparse directory extension
100def _encode_varint(value: int) -> bytes:
101 """Encode an integer using variable-width encoding.
103 Same format as used for OFS_DELTA pack entries and index v4 path compression.
104 Uses 7 bits per byte, with the high bit indicating continuation.
106 Args:
107 value: Integer to encode
108 Returns:
109 Encoded bytes
110 """
111 if value == 0:
112 return b"\x00"
114 result = []
115 while value > 0:
116 byte = value & 0x7F # Take lower 7 bits
117 value >>= 7
118 if value > 0:
119 byte |= 0x80 # Set continuation bit
120 result.append(byte)
122 return bytes(result)
125def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
126 """Decode a variable-width encoded integer.
128 Args:
129 data: Bytes to decode from
130 offset: Starting offset in data
131 Returns:
132 tuple of (decoded_value, new_offset)
133 """
134 value = 0
135 shift = 0
136 pos = offset
138 while pos < len(data):
139 byte = data[pos]
140 pos += 1
141 value |= (byte & 0x7F) << shift
142 shift += 7
143 if not (byte & 0x80): # No continuation bit
144 break
146 return value, pos
149def _compress_path(path: bytes, previous_path: bytes) -> bytes:
150 """Compress a path relative to the previous path for index version 4.
152 Args:
153 path: Path to compress
154 previous_path: Previous path for comparison
155 Returns:
156 Compressed path data (varint prefix_len + suffix)
157 """
158 # Find the common prefix length
159 common_len = 0
160 min_len = min(len(path), len(previous_path))
162 for i in range(min_len):
163 if path[i] == previous_path[i]:
164 common_len += 1
165 else:
166 break
168 # The number of bytes to remove from the end of previous_path
169 # to get the common prefix
170 remove_len = len(previous_path) - common_len
172 # The suffix to append
173 suffix = path[common_len:]
175 # Encode: varint(remove_len) + suffix + NUL
176 return _encode_varint(remove_len) + suffix + b"\x00"
179def _decompress_path(
180 data: bytes, offset: int, previous_path: bytes
181) -> tuple[bytes, int]:
182 """Decompress a path from index version 4 compressed format.
184 Args:
185 data: Raw data containing compressed path
186 offset: Starting offset in data
187 previous_path: Previous path for decompression
188 Returns:
189 tuple of (decompressed_path, new_offset)
190 """
191 # Decode the number of bytes to remove from previous path
192 remove_len, new_offset = _decode_varint(data, offset)
194 # Find the NUL terminator for the suffix
195 suffix_start = new_offset
196 suffix_end = suffix_start
197 while suffix_end < len(data) and data[suffix_end] != 0:
198 suffix_end += 1
200 if suffix_end >= len(data):
201 raise ValueError("Unterminated path suffix in compressed entry")
203 suffix = data[suffix_start:suffix_end]
204 new_offset = suffix_end + 1 # Skip the NUL terminator
206 # Reconstruct the path
207 if remove_len > len(previous_path):
208 raise ValueError(
209 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
210 )
212 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
213 path = prefix + suffix
215 return path, new_offset
218def _decompress_path_from_stream(
219 f: BinaryIO, previous_path: bytes
220) -> tuple[bytes, int]:
221 """Decompress a path from index version 4 compressed format, reading from stream.
223 Args:
224 f: File-like object to read from
225 previous_path: Previous path for decompression
226 Returns:
227 tuple of (decompressed_path, bytes_consumed)
228 """
229 # Decode the varint for remove_len by reading byte by byte
230 remove_len = 0
231 shift = 0
232 bytes_consumed = 0
234 while True:
235 byte_data = f.read(1)
236 if not byte_data:
237 raise ValueError("Unexpected end of file while reading varint")
238 byte = byte_data[0]
239 bytes_consumed += 1
240 remove_len |= (byte & 0x7F) << shift
241 shift += 7
242 if not (byte & 0x80): # No continuation bit
243 break
245 # Read the suffix until NUL terminator
246 suffix = b""
247 while True:
248 byte_data = f.read(1)
249 if not byte_data:
250 raise ValueError("Unexpected end of file while reading path suffix")
251 byte = byte_data[0]
252 bytes_consumed += 1
253 if byte == 0: # NUL terminator
254 break
255 suffix += bytes([byte])
257 # Reconstruct the path
258 if remove_len > len(previous_path):
259 raise ValueError(
260 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
261 )
263 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
264 path = prefix + suffix
266 return path, bytes_consumed
269class Stage(Enum):
270 """Represents the stage of an index entry during merge conflicts."""
272 NORMAL = 0
273 MERGE_CONFLICT_ANCESTOR = 1
274 MERGE_CONFLICT_THIS = 2
275 MERGE_CONFLICT_OTHER = 3
278@dataclass
279class SerializedIndexEntry:
280 """Represents a serialized index entry as stored in the index file.
282 This dataclass holds the raw data for an index entry before it's
283 parsed into the more user-friendly IndexEntry format.
284 """
286 name: bytes
287 ctime: Union[int, float, tuple[int, int]]
288 mtime: Union[int, float, tuple[int, int]]
289 dev: int
290 ino: int
291 mode: int
292 uid: int
293 gid: int
294 size: int
295 sha: bytes
296 flags: int
297 extended_flags: int
299 def stage(self) -> Stage:
300 """Extract the stage from the flags field.
302 Returns:
303 Stage enum value indicating merge conflict state
304 """
305 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
307 def is_sparse_dir(self) -> bool:
308 """Check if this entry represents a sparse directory.
310 A sparse directory entry is a collapsed representation of an entire
311 directory tree in a sparse index. It has:
312 - Directory mode (0o040000)
313 - SKIP_WORKTREE flag set
314 - Path ending with '/'
315 - SHA pointing to a tree object
317 Returns:
318 True if entry is a sparse directory entry
319 """
320 return (
321 stat.S_ISDIR(self.mode)
322 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
323 and self.name.endswith(b"/")
324 )
327@dataclass
328class IndexExtension:
329 """Base class for index extensions."""
331 signature: bytes
332 data: bytes
334 @classmethod
335 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
336 """Create an extension from raw data.
338 Args:
339 signature: 4-byte extension signature
340 data: Extension data
341 Returns:
342 Parsed extension object
343 """
344 if signature == TREE_EXTENSION:
345 return TreeExtension.from_bytes(data)
346 elif signature == REUC_EXTENSION:
347 return ResolveUndoExtension.from_bytes(data)
348 elif signature == UNTR_EXTENSION:
349 return UntrackedExtension.from_bytes(data)
350 elif signature == SDIR_EXTENSION:
351 return SparseDirExtension.from_bytes(data)
352 else:
353 # Unknown extension - just store raw data
354 return cls(signature, data)
356 def to_bytes(self) -> bytes:
357 """Serialize extension to bytes."""
358 return self.data
361class TreeExtension(IndexExtension):
362 """Tree cache extension."""
364 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
365 """Initialize TreeExtension.
367 Args:
368 entries: List of tree cache entries (path, sha, flags)
369 """
370 self.entries = entries
371 super().__init__(TREE_EXTENSION, b"")
373 @classmethod
374 def from_bytes(cls, data: bytes) -> "TreeExtension":
375 """Parse TreeExtension from bytes.
377 Args:
378 data: Raw bytes to parse
380 Returns:
381 TreeExtension instance
382 """
383 # TODO: Implement tree cache parsing
384 return cls([])
386 def to_bytes(self) -> bytes:
387 """Serialize TreeExtension to bytes.
389 Returns:
390 Serialized extension data
391 """
392 # TODO: Implement tree cache serialization
393 return b""
396class ResolveUndoExtension(IndexExtension):
397 """Resolve undo extension for recording merge conflicts."""
399 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
400 """Initialize ResolveUndoExtension.
402 Args:
403 entries: List of (path, stages) where stages is a list of (stage, sha) tuples
404 """
405 self.entries = entries
406 super().__init__(REUC_EXTENSION, b"")
408 @classmethod
409 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
410 """Parse ResolveUndoExtension from bytes.
412 Args:
413 data: Raw bytes to parse
415 Returns:
416 ResolveUndoExtension instance
417 """
418 # TODO: Implement resolve undo parsing
419 return cls([])
421 def to_bytes(self) -> bytes:
422 """Serialize ResolveUndoExtension to bytes.
424 Returns:
425 Serialized extension data
426 """
427 # TODO: Implement resolve undo serialization
428 return b""
431class UntrackedExtension(IndexExtension):
432 """Untracked cache extension."""
434 def __init__(self, data: bytes) -> None:
435 """Initialize UntrackedExtension.
437 Args:
438 data: Raw untracked cache data
439 """
440 super().__init__(UNTR_EXTENSION, data)
442 @classmethod
443 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
444 """Parse UntrackedExtension from bytes.
446 Args:
447 data: Raw bytes to parse
449 Returns:
450 UntrackedExtension instance
451 """
452 return cls(data)
455class SparseDirExtension(IndexExtension):
456 """Sparse directory extension.
458 This extension indicates that the index contains sparse directory entries.
459 Tools that don't understand sparse index should avoid interacting with
460 the index when this extension is present.
462 The extension data is empty - its presence is the signal.
463 """
465 def __init__(self) -> None:
466 """Initialize SparseDirExtension."""
467 super().__init__(SDIR_EXTENSION, b"")
469 @classmethod
470 def from_bytes(cls, data: bytes) -> "SparseDirExtension":
471 """Parse SparseDirExtension from bytes.
473 Args:
474 data: Raw bytes to parse (should be empty)
476 Returns:
477 SparseDirExtension instance
478 """
479 return cls()
481 def to_bytes(self) -> bytes:
482 """Serialize SparseDirExtension to bytes.
484 Returns:
485 Empty bytes (extension presence is the signal)
486 """
487 return b""
490@dataclass
491class IndexEntry:
492 """Represents an entry in the Git index.
494 This is a higher-level representation of an index entry that includes
495 parsed data and convenience methods.
496 """
498 ctime: Union[int, float, tuple[int, int]]
499 mtime: Union[int, float, tuple[int, int]]
500 dev: int
501 ino: int
502 mode: int
503 uid: int
504 gid: int
505 size: int
506 sha: bytes
507 flags: int = 0
508 extended_flags: int = 0
510 @classmethod
511 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
512 """Create an IndexEntry from a SerializedIndexEntry.
514 Args:
515 serialized: SerializedIndexEntry to convert
517 Returns:
518 New IndexEntry instance
519 """
520 return cls(
521 ctime=serialized.ctime,
522 mtime=serialized.mtime,
523 dev=serialized.dev,
524 ino=serialized.ino,
525 mode=serialized.mode,
526 uid=serialized.uid,
527 gid=serialized.gid,
528 size=serialized.size,
529 sha=serialized.sha,
530 flags=serialized.flags,
531 extended_flags=serialized.extended_flags,
532 )
534 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
535 """Serialize this entry with a given name and stage.
537 Args:
538 name: Path name for the entry
539 stage: Merge conflict stage
541 Returns:
542 SerializedIndexEntry ready for writing to disk
543 """
544 # Clear out any existing stage bits, then set them from the Stage.
545 new_flags = self.flags & ~FLAG_STAGEMASK
546 new_flags |= stage.value << FLAG_STAGESHIFT
547 return SerializedIndexEntry(
548 name=name,
549 ctime=self.ctime,
550 mtime=self.mtime,
551 dev=self.dev,
552 ino=self.ino,
553 mode=self.mode,
554 uid=self.uid,
555 gid=self.gid,
556 size=self.size,
557 sha=self.sha,
558 flags=new_flags,
559 extended_flags=self.extended_flags,
560 )
562 def stage(self) -> Stage:
563 """Get the merge conflict stage of this entry.
565 Returns:
566 Stage enum value
567 """
568 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
570 @property
571 def skip_worktree(self) -> bool:
572 """Return True if the skip-worktree bit is set in extended_flags."""
573 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
575 def set_skip_worktree(self, skip: bool = True) -> None:
576 """Helper method to set or clear the skip-worktree bit in extended_flags.
578 Also sets FLAG_EXTENDED in self.flags if needed.
579 """
580 if skip:
581 # Turn on the skip-worktree bit
582 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
583 # Also ensure the main 'extended' bit is set in flags
584 self.flags |= FLAG_EXTENDED
585 else:
586 # Turn off the skip-worktree bit
587 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
588 # Optionally unset the main extended bit if no extended flags remain
589 if self.extended_flags == 0:
590 self.flags &= ~FLAG_EXTENDED
592 def is_sparse_dir(self, name: bytes) -> bool:
593 """Check if this entry represents a sparse directory.
595 A sparse directory entry is a collapsed representation of an entire
596 directory tree in a sparse index. It has:
597 - Directory mode (0o040000)
598 - SKIP_WORKTREE flag set
599 - Path ending with '/'
600 - SHA pointing to a tree object
602 Args:
603 name: The path name for this entry (IndexEntry doesn't store name)
605 Returns:
606 True if entry is a sparse directory entry
607 """
608 return (
609 stat.S_ISDIR(self.mode)
610 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
611 and name.endswith(b"/")
612 )
615class ConflictedIndexEntry:
616 """Index entry that represents a conflict."""
618 ancestor: Optional[IndexEntry]
619 this: Optional[IndexEntry]
620 other: Optional[IndexEntry]
622 def __init__(
623 self,
624 ancestor: Optional[IndexEntry] = None,
625 this: Optional[IndexEntry] = None,
626 other: Optional[IndexEntry] = None,
627 ) -> None:
628 """Initialize ConflictedIndexEntry.
630 Args:
631 ancestor: The common ancestor entry
632 this: The current branch entry
633 other: The other branch entry
634 """
635 self.ancestor = ancestor
636 self.this = this
637 self.other = other
640class UnmergedEntries(Exception):
641 """Unmerged entries exist in the index."""
644def pathsplit(path: bytes) -> tuple[bytes, bytes]:
645 """Split a /-delimited path into a directory part and a basename.
647 Args:
648 path: The path to split.
650 Returns:
651 Tuple with directory name and basename
652 """
653 try:
654 (dirname, basename) = path.rsplit(b"/", 1)
655 except ValueError:
656 return (b"", path)
657 else:
658 return (dirname, basename)
661def pathjoin(*args: bytes) -> bytes:
662 """Join a /-delimited path."""
663 return b"/".join([p for p in args if p])
666def read_cache_time(f: BinaryIO) -> tuple[int, int]:
667 """Read a cache time.
669 Args:
670 f: File-like object to read from
671 Returns:
672 Tuple with seconds and nanoseconds
673 """
674 return struct.unpack(">LL", f.read(8))
677def write_cache_time(f: IO[bytes], t: Union[int, float, tuple[int, int]]) -> None:
678 """Write a cache time.
680 Args:
681 f: File-like object to write to
682 t: Time to write (as int, float or tuple with secs and nsecs)
683 """
684 if isinstance(t, int):
685 t = (t, 0)
686 elif isinstance(t, float):
687 (secs, nsecs) = divmod(t, 1.0)
688 t = (int(secs), int(nsecs * 1000000000))
689 elif not isinstance(t, tuple):
690 raise TypeError(t)
691 f.write(struct.pack(">LL", *t))
694def read_cache_entry(
695 f: BinaryIO, version: int, previous_path: bytes = b""
696) -> SerializedIndexEntry:
697 """Read an entry from a cache file.
699 Args:
700 f: File-like object to read from
701 version: Index version
702 previous_path: Previous entry's path (for version 4 compression)
703 """
704 beginoffset = f.tell()
705 ctime = read_cache_time(f)
706 mtime = read_cache_time(f)
707 (
708 dev,
709 ino,
710 mode,
711 uid,
712 gid,
713 size,
714 sha,
715 flags,
716 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
717 if flags & FLAG_EXTENDED:
718 if version < 3:
719 raise AssertionError("extended flag set in index with version < 3")
720 (extended_flags,) = struct.unpack(">H", f.read(2))
721 else:
722 extended_flags = 0
724 if version >= 4:
725 # Version 4: paths are always compressed (name_len should be 0)
726 name, _consumed = _decompress_path_from_stream(f, previous_path)
727 else:
728 # Versions < 4: regular name reading
729 name = f.read(flags & FLAG_NAMEMASK)
731 # Padding:
732 if version < 4:
733 real_size = (f.tell() - beginoffset + 8) & ~7
734 f.read((beginoffset + real_size) - f.tell())
736 return SerializedIndexEntry(
737 name,
738 ctime,
739 mtime,
740 dev,
741 ino,
742 mode,
743 uid,
744 gid,
745 size,
746 sha_to_hex(sha),
747 flags & ~FLAG_NAMEMASK,
748 extended_flags,
749 )
752def write_cache_entry(
753 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
754) -> None:
755 """Write an index entry to a file.
757 Args:
758 f: File object
759 entry: IndexEntry to write
760 version: Index format version
761 previous_path: Previous entry's path (for version 4 compression)
762 """
763 beginoffset = f.tell()
764 write_cache_time(f, entry.ctime)
765 write_cache_time(f, entry.mtime)
767 if version >= 4:
768 # Version 4: use compression but set name_len to actual filename length
769 # This matches how C Git implements index v4 flags
770 compressed_path = _compress_path(entry.name, previous_path)
771 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
772 else:
773 # Versions < 4: include actual name length
774 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
776 if entry.extended_flags:
777 flags |= FLAG_EXTENDED
778 if flags & FLAG_EXTENDED and version is not None and version < 3:
779 raise AssertionError("unable to use extended flags in version < 3")
781 f.write(
782 struct.pack(
783 b">LLLLLL20sH",
784 entry.dev & 0xFFFFFFFF,
785 entry.ino & 0xFFFFFFFF,
786 entry.mode,
787 entry.uid,
788 entry.gid,
789 entry.size,
790 hex_to_sha(entry.sha),
791 flags,
792 )
793 )
794 if flags & FLAG_EXTENDED:
795 f.write(struct.pack(b">H", entry.extended_flags))
797 if version >= 4:
798 # Version 4: always write compressed path
799 f.write(compressed_path)
800 else:
801 # Versions < 4: write regular path and padding
802 f.write(entry.name)
803 real_size = (f.tell() - beginoffset + 8) & ~7
804 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
807class UnsupportedIndexFormat(Exception):
808 """An unsupported index format was encountered."""
810 def __init__(self, version: int) -> None:
811 """Initialize UnsupportedIndexFormat exception.
813 Args:
814 version: The unsupported index format version
815 """
816 self.index_format_version = version
819def read_index_header(f: BinaryIO) -> tuple[int, int]:
820 """Read an index header from a file.
822 Returns:
823 tuple of (version, num_entries)
824 """
825 header = f.read(4)
826 if header != b"DIRC":
827 raise AssertionError(f"Invalid index file header: {header!r}")
828 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
829 if version not in (1, 2, 3, 4):
830 raise UnsupportedIndexFormat(version)
831 return version, num_entries
834def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None:
835 """Write an index extension.
837 Args:
838 f: File-like object to write to
839 extension: Extension to write
840 """
841 data = extension.to_bytes()
842 f.write(extension.signature)
843 f.write(struct.pack(">I", len(data)))
844 f.write(data)
847def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
848 """Read an index file, yielding the individual entries."""
849 version, num_entries = read_index_header(f)
850 previous_path = b""
851 for i in range(num_entries):
852 entry = read_cache_entry(f, version, previous_path)
853 previous_path = entry.name
854 yield entry
857def read_index_dict_with_version(
858 f: BinaryIO,
859) -> tuple[
860 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
861]:
862 """Read an index file and return it as a dictionary along with the version.
864 Returns:
865 tuple of (entries_dict, version, extensions)
866 """
867 version, num_entries = read_index_header(f)
869 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
870 previous_path = b""
871 for i in range(num_entries):
872 entry = read_cache_entry(f, version, previous_path)
873 previous_path = entry.name
874 stage = entry.stage()
875 if stage == Stage.NORMAL:
876 ret[entry.name] = IndexEntry.from_serialized(entry)
877 else:
878 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
879 if isinstance(existing, IndexEntry):
880 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
881 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
882 existing.ancestor = IndexEntry.from_serialized(entry)
883 elif stage == Stage.MERGE_CONFLICT_THIS:
884 existing.this = IndexEntry.from_serialized(entry)
885 elif stage == Stage.MERGE_CONFLICT_OTHER:
886 existing.other = IndexEntry.from_serialized(entry)
888 # Read extensions
889 extensions = []
890 while True:
891 # Check if we're at the end (20 bytes before EOF for SHA checksum)
892 current_pos = f.tell()
893 f.seek(0, 2) # EOF
894 eof_pos = f.tell()
895 f.seek(current_pos)
897 if current_pos >= eof_pos - 20:
898 break
900 # Try to read extension signature
901 signature = f.read(4)
902 if len(signature) < 4:
903 break
905 # Check if it's a valid extension signature (4 uppercase letters)
906 if not all(65 <= b <= 90 for b in signature):
907 # Not an extension, seek back
908 f.seek(-4, 1)
909 break
911 # Read extension size
912 size_data = f.read(4)
913 if len(size_data) < 4:
914 break
915 size = struct.unpack(">I", size_data)[0]
917 # Read extension data
918 data = f.read(size)
919 if len(data) < size:
920 break
922 extension = IndexExtension.from_raw(signature, data)
923 extensions.append(extension)
925 return ret, version, extensions
928def read_index_dict(
929 f: BinaryIO,
930) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
931 """Read an index file and return it as a dictionary.
933 Dict Key is tuple of path and stage number, as
934 path alone is not unique
935 Args:
936 f: File object to read fromls.
937 """
938 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
939 for entry in read_index(f):
940 stage = entry.stage()
941 if stage == Stage.NORMAL:
942 ret[entry.name] = IndexEntry.from_serialized(entry)
943 else:
944 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
945 if isinstance(existing, IndexEntry):
946 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
947 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
948 existing.ancestor = IndexEntry.from_serialized(entry)
949 elif stage == Stage.MERGE_CONFLICT_THIS:
950 existing.this = IndexEntry.from_serialized(entry)
951 elif stage == Stage.MERGE_CONFLICT_OTHER:
952 existing.other = IndexEntry.from_serialized(entry)
953 return ret
956def write_index(
957 f: IO[bytes],
958 entries: Sequence[SerializedIndexEntry],
959 version: Optional[int] = None,
960 extensions: Optional[Sequence[IndexExtension]] = None,
961) -> None:
962 """Write an index file.
964 Args:
965 f: File-like object to write to
966 version: Version number to write
967 entries: Iterable over the entries to write
968 extensions: Optional list of extensions to write
969 """
970 if version is None:
971 version = DEFAULT_VERSION
972 # STEP 1: check if any extended_flags are set
973 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
974 if uses_extended_flags and version < 3:
975 # Force or bump the version to 3
976 version = 3
977 # The rest is unchanged, but you might insert a final check:
978 if version < 3:
979 # Double-check no extended flags appear
980 for e in entries:
981 if e.extended_flags != 0:
982 raise AssertionError("Attempt to use extended flags in index < v3")
983 # Proceed with the existing code to write the header and entries.
984 f.write(b"DIRC")
985 f.write(struct.pack(b">LL", version, len(entries)))
986 previous_path = b""
987 for entry in entries:
988 write_cache_entry(f, entry, version=version, previous_path=previous_path)
989 previous_path = entry.name
991 # Write extensions
992 if extensions:
993 for extension in extensions:
994 write_index_extension(f, extension)
997def write_index_dict(
998 f: IO[bytes],
999 entries: Mapping[bytes, Union[IndexEntry, ConflictedIndexEntry]],
1000 version: Optional[int] = None,
1001 extensions: Optional[Sequence[IndexExtension]] = None,
1002) -> None:
1003 """Write an index file based on the contents of a dictionary.
1005 being careful to sort by path and then by stage.
1006 """
1007 entries_list = []
1008 for key in sorted(entries):
1009 value = entries[key]
1010 if isinstance(value, ConflictedIndexEntry):
1011 if value.ancestor is not None:
1012 entries_list.append(
1013 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
1014 )
1015 if value.this is not None:
1016 entries_list.append(
1017 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
1018 )
1019 if value.other is not None:
1020 entries_list.append(
1021 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
1022 )
1023 else:
1024 entries_list.append(value.serialize(key, Stage.NORMAL))
1026 write_index(f, entries_list, version=version, extensions=extensions)
1029def cleanup_mode(mode: int) -> int:
1030 """Cleanup a mode value.
1032 This will return a mode that can be stored in a tree object.
1034 Args:
1035 mode: Mode to clean up.
1037 Returns:
1038 mode
1039 """
1040 if stat.S_ISLNK(mode):
1041 return stat.S_IFLNK
1042 elif stat.S_ISDIR(mode):
1043 return stat.S_IFDIR
1044 elif S_ISGITLINK(mode):
1045 return S_IFGITLINK
1046 ret = stat.S_IFREG | 0o644
1047 if mode & 0o100:
1048 ret |= 0o111
1049 return ret
1052class Index:
1053 """A Git Index file."""
1055 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
1057 def __init__(
1058 self,
1059 filename: Union[bytes, str, os.PathLike[str]],
1060 read: bool = True,
1061 skip_hash: bool = False,
1062 version: Optional[int] = None,
1063 ) -> None:
1064 """Create an index object associated with the given filename.
1066 Args:
1067 filename: Path to the index file
1068 read: Whether to initialize the index from the given file, should it exist.
1069 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
1070 version: Index format version to use (None = auto-detect from file or use default)
1071 """
1072 self._filename = os.fspath(filename)
1073 # TODO(jelmer): Store the version returned by read_index
1074 self._version = version
1075 self._skip_hash = skip_hash
1076 self._extensions: list[IndexExtension] = []
1077 self.clear()
1078 if read:
1079 self.read()
1081 @property
1082 def path(self) -> Union[bytes, str]:
1083 """Get the path to the index file.
1085 Returns:
1086 Path to the index file
1087 """
1088 return self._filename
1090 def __repr__(self) -> str:
1091 """Return string representation of Index."""
1092 return f"{self.__class__.__name__}({self._filename!r})"
1094 def write(self) -> None:
1095 """Write current contents of index to disk."""
1096 f = GitFile(self._filename, "wb")
1097 try:
1098 # Filter out extensions with no meaningful data
1099 meaningful_extensions = []
1100 for ext in self._extensions:
1101 # Skip extensions that have empty data
1102 ext_data = ext.to_bytes()
1103 if ext_data:
1104 meaningful_extensions.append(ext)
1106 if self._skip_hash:
1107 # When skipHash is enabled, write the index without computing SHA1
1108 write_index_dict(
1109 f,
1110 self._byname,
1111 version=self._version,
1112 extensions=meaningful_extensions,
1113 )
1114 # Write 20 zero bytes instead of SHA1
1115 f.write(b"\x00" * 20)
1116 f.close()
1117 else:
1118 sha1_writer = SHA1Writer(f)
1119 write_index_dict(
1120 sha1_writer,
1121 self._byname,
1122 version=self._version,
1123 extensions=meaningful_extensions,
1124 )
1125 sha1_writer.close()
1126 except:
1127 f.close()
1128 raise
1130 def read(self) -> None:
1131 """Read current contents of index from disk."""
1132 if not os.path.exists(self._filename):
1133 return
1134 f = GitFile(self._filename, "rb")
1135 try:
1136 sha1_reader = SHA1Reader(f)
1137 entries, version, extensions = read_index_dict_with_version(sha1_reader)
1138 self._version = version
1139 self._extensions = extensions
1140 self.update(entries)
1141 # Extensions have already been read by read_index_dict_with_version
1142 sha1_reader.check_sha(allow_empty=True)
1143 finally:
1144 f.close()
1146 def __len__(self) -> int:
1147 """Number of entries in this index file."""
1148 return len(self._byname)
1150 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
1151 """Retrieve entry by relative path and stage.
1153 Returns: Either a IndexEntry or a ConflictedIndexEntry
1154 Raises KeyError: if the entry does not exist
1155 """
1156 return self._byname[key]
1158 def __iter__(self) -> Iterator[bytes]:
1159 """Iterate over the paths and stages in this index."""
1160 return iter(self._byname)
1162 def __contains__(self, key: bytes) -> bool:
1163 """Check if a path exists in the index."""
1164 return key in self._byname
1166 def get_sha1(self, path: bytes) -> bytes:
1167 """Return the (git object) SHA1 for the object at a path."""
1168 value = self[path]
1169 if isinstance(value, ConflictedIndexEntry):
1170 raise UnmergedEntries
1171 return value.sha
1173 def get_mode(self, path: bytes) -> int:
1174 """Return the POSIX file mode for the object at a path."""
1175 value = self[path]
1176 if isinstance(value, ConflictedIndexEntry):
1177 raise UnmergedEntries
1178 return value.mode
1180 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
1181 """Iterate over path, sha, mode tuples for use with commit_tree."""
1182 for path in self:
1183 entry = self[path]
1184 if isinstance(entry, ConflictedIndexEntry):
1185 raise UnmergedEntries
1186 yield path, entry.sha, cleanup_mode(entry.mode)
1188 def has_conflicts(self) -> bool:
1189 """Check if the index contains any conflicted entries.
1191 Returns:
1192 True if any entries are conflicted, False otherwise
1193 """
1194 for value in self._byname.values():
1195 if isinstance(value, ConflictedIndexEntry):
1196 return True
1197 return False
1199 def clear(self) -> None:
1200 """Remove all contents from this index."""
1201 self._byname = {}
1203 def __setitem__(
1204 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
1205 ) -> None:
1206 """Set an entry in the index."""
1207 assert isinstance(name, bytes)
1208 self._byname[name] = value
1210 def __delitem__(self, name: bytes) -> None:
1211 """Delete an entry from the index."""
1212 del self._byname[name]
1214 def iteritems(
1215 self,
1216 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1217 """Iterate over (path, entry) pairs in the index.
1219 Returns:
1220 Iterator of (path, entry) tuples
1221 """
1222 return iter(self._byname.items())
1224 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1225 """Get an iterator over (path, entry) pairs.
1227 Returns:
1228 Iterator of (path, entry) tuples
1229 """
1230 return iter(self._byname.items())
1232 def update(
1233 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
1234 ) -> None:
1235 """Update the index with multiple entries.
1237 Args:
1238 entries: Dictionary mapping paths to index entries
1239 """
1240 for key, value in entries.items():
1241 self[key] = value
1243 def paths(self) -> Generator[bytes, None, None]:
1244 """Generate all paths in the index.
1246 Yields:
1247 Path names as bytes
1248 """
1249 yield from self._byname.keys()
1251 def changes_from_tree(
1252 self,
1253 object_store: ObjectContainer,
1254 tree: ObjectID,
1255 want_unchanged: bool = False,
1256 ) -> Generator[
1257 tuple[
1258 tuple[Optional[bytes], Optional[bytes]],
1259 tuple[Optional[int], Optional[int]],
1260 tuple[Optional[bytes], Optional[bytes]],
1261 ],
1262 None,
1263 None,
1264 ]:
1265 """Find the differences between the contents of this index and a tree.
1267 Args:
1268 object_store: Object store to use for retrieving tree contents
1269 tree: SHA1 of the root tree
1270 want_unchanged: Whether unchanged files should be reported
1271 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1272 newmode), (oldsha, newsha)
1273 """
1275 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1276 entry = self[path]
1277 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1278 return entry.sha, cleanup_mode(entry.mode)
1279 else:
1280 # Handle ConflictedIndexEntry case
1281 return b"", 0
1283 yield from changes_from_tree(
1284 self.paths(),
1285 lookup_entry,
1286 object_store,
1287 tree,
1288 want_unchanged=want_unchanged,
1289 )
1291 def commit(self, object_store: ObjectContainer) -> bytes:
1292 """Create a new tree from an index.
1294 Args:
1295 object_store: Object store to save the tree in
1296 Returns:
1297 Root tree SHA
1298 """
1299 return commit_tree(object_store, self.iterobjects())
1301 def is_sparse(self) -> bool:
1302 """Check if this index contains sparse directory entries.
1304 Returns:
1305 True if any sparse directory extension is present
1306 """
1307 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions)
1309 def ensure_full_index(self, object_store: "BaseObjectStore") -> None:
1310 """Expand all sparse directory entries into full file entries.
1312 This converts a sparse index into a full index by recursively
1313 expanding any sparse directory entries into their constituent files.
1315 Args:
1316 object_store: Object store to read tree objects from
1318 Raises:
1319 KeyError: If a tree object referenced by a sparse dir entry doesn't exist
1320 """
1321 if not self.is_sparse():
1322 return
1324 # Find all sparse directory entries
1325 sparse_dirs = []
1326 for path, entry in list(self._byname.items()):
1327 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path):
1328 sparse_dirs.append((path, entry))
1330 # Expand each sparse directory
1331 for path, entry in sparse_dirs:
1332 # Remove the sparse directory entry
1333 del self._byname[path]
1335 # Get the tree object
1336 tree = object_store[entry.sha]
1337 if not isinstance(tree, Tree):
1338 raise ValueError(f"Sparse directory {path!r} points to non-tree object")
1340 # Recursively add all entries from the tree
1341 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry)
1343 # Remove the sparse directory extension
1344 self._extensions = [
1345 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension)
1346 ]
1348 def _expand_tree(
1349 self,
1350 prefix: bytes,
1351 tree: Tree,
1352 object_store: "BaseObjectStore",
1353 template_entry: IndexEntry,
1354 ) -> None:
1355 """Recursively expand a tree into index entries.
1357 Args:
1358 prefix: Path prefix for entries (without trailing slash)
1359 tree: Tree object to expand
1360 object_store: Object store to read nested trees from
1361 template_entry: Template entry to copy metadata from
1362 """
1363 for name, mode, sha in tree.items():
1364 if prefix:
1365 full_path = prefix + b"/" + name
1366 else:
1367 full_path = name
1369 if stat.S_ISDIR(mode):
1370 # Recursively expand subdirectories
1371 subtree = object_store[sha]
1372 if not isinstance(subtree, Tree):
1373 raise ValueError(
1374 f"Directory entry {full_path!r} points to non-tree object"
1375 )
1376 self._expand_tree(full_path, subtree, object_store, template_entry)
1377 else:
1378 # Create an index entry for this file
1379 # Use the template entry for metadata but with the file's sha and mode
1380 new_entry = IndexEntry(
1381 ctime=template_entry.ctime,
1382 mtime=template_entry.mtime,
1383 dev=template_entry.dev,
1384 ino=template_entry.ino,
1385 mode=mode,
1386 uid=template_entry.uid,
1387 gid=template_entry.gid,
1388 size=0, # Size is unknown from tree
1389 sha=sha,
1390 flags=0,
1391 extended_flags=0, # Don't copy skip-worktree flag
1392 )
1393 self._byname[full_path] = new_entry
1395 def convert_to_sparse(
1396 self,
1397 object_store: "BaseObjectStore",
1398 tree_sha: bytes,
1399 sparse_dirs: Set[bytes],
1400 ) -> None:
1401 """Convert full index entries to sparse directory entries.
1403 This collapses directories that are entirely outside the sparse
1404 checkout cone into single sparse directory entries.
1406 Args:
1407 object_store: Object store to read tree objects
1408 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on
1409 sparse_dirs: Set of directory paths (with trailing /) to collapse
1411 Raises:
1412 KeyError: If tree_sha or a subdirectory doesn't exist
1413 """
1414 if not sparse_dirs:
1415 return
1417 # Get the base tree
1418 tree = object_store[tree_sha]
1419 if not isinstance(tree, Tree):
1420 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object")
1422 # For each sparse directory, find its tree SHA and create sparse entry
1423 for dir_path in sparse_dirs:
1424 dir_path_stripped = dir_path.rstrip(b"/")
1426 # Find the tree SHA for this directory
1427 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store)
1428 if subtree_sha is None:
1429 # Directory doesn't exist in tree, skip it
1430 continue
1432 # Remove all entries under this directory
1433 entries_to_remove = [
1434 path
1435 for path in self._byname
1436 if path.startswith(dir_path) or path == dir_path_stripped
1437 ]
1438 for path in entries_to_remove:
1439 del self._byname[path]
1441 # Create a sparse directory entry
1442 # Use minimal metadata since it's not a real file
1443 sparse_entry = IndexEntry(
1444 ctime=0,
1445 mtime=0,
1446 dev=0,
1447 ino=0,
1448 mode=stat.S_IFDIR,
1449 uid=0,
1450 gid=0,
1451 size=0,
1452 sha=subtree_sha,
1453 flags=0,
1454 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
1455 )
1456 self._byname[dir_path] = sparse_entry
1458 # Add sparse directory extension if not present
1459 if not self.is_sparse():
1460 self._extensions.append(SparseDirExtension())
1462 def _find_subtree_sha(
1463 self,
1464 tree: Tree,
1465 path: bytes,
1466 object_store: "BaseObjectStore",
1467 ) -> Optional[bytes]:
1468 """Find the SHA of a subtree at a given path.
1470 Args:
1471 tree: Root tree object to search in
1472 path: Path to the subdirectory (no trailing slash)
1473 object_store: Object store to read nested trees from
1475 Returns:
1476 SHA of the subtree, or None if path doesn't exist
1477 """
1478 if not path:
1479 return tree.id
1481 parts = path.split(b"/")
1482 current_tree = tree
1484 for part in parts:
1485 # Look for this part in the current tree
1486 try:
1487 mode, sha = current_tree[part]
1488 except KeyError:
1489 return None
1491 if not stat.S_ISDIR(mode):
1492 # Path component is a file, not a directory
1493 return None
1495 # Load the next tree
1496 obj = object_store[sha]
1497 if not isinstance(obj, Tree):
1498 return None
1499 current_tree = obj
1501 return current_tree.id
1504def commit_tree(
1505 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1506) -> bytes:
1507 """Commit a new tree.
1509 Args:
1510 object_store: Object store to add trees to
1511 blobs: Iterable over blob path, sha, mode entries
1512 Returns:
1513 SHA1 of the created tree.
1514 """
1515 trees: dict[bytes, TreeDict] = {b"": {}}
1517 def add_tree(path: bytes) -> TreeDict:
1518 if path in trees:
1519 return trees[path]
1520 dirname, basename = pathsplit(path)
1521 t = add_tree(dirname)
1522 assert isinstance(basename, bytes)
1523 newtree: TreeDict = {}
1524 t[basename] = newtree
1525 trees[path] = newtree
1526 return newtree
1528 for path, sha, mode in blobs:
1529 tree_path, basename = pathsplit(path)
1530 tree = add_tree(tree_path)
1531 tree[basename] = (mode, sha)
1533 def build_tree(path: bytes) -> bytes:
1534 tree = Tree()
1535 for basename, entry in trees[path].items():
1536 if isinstance(entry, dict):
1537 mode = stat.S_IFDIR
1538 sha = build_tree(pathjoin(path, basename))
1539 else:
1540 (mode, sha) = entry
1541 tree.add(basename, mode, sha)
1542 object_store.add_object(tree)
1543 return tree.id
1545 return build_tree(b"")
1548def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1549 """Create a new tree from an index.
1551 Args:
1552 object_store: Object store to save the tree in
1553 index: Index file
1554 Note: This function is deprecated, use index.commit() instead.
1555 Returns: Root tree sha.
1556 """
1557 return commit_tree(object_store, index.iterobjects())
1560def changes_from_tree(
1561 names: Iterable[bytes],
1562 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1563 object_store: ObjectContainer,
1564 tree: Optional[bytes],
1565 want_unchanged: bool = False,
1566) -> Iterable[
1567 tuple[
1568 tuple[Optional[bytes], Optional[bytes]],
1569 tuple[Optional[int], Optional[int]],
1570 tuple[Optional[bytes], Optional[bytes]],
1571 ]
1572]:
1573 """Find the differences between the contents of a tree and a working copy.
1575 Args:
1576 names: Iterable of names in the working copy
1577 lookup_entry: Function to lookup an entry in the working copy
1578 object_store: Object store to use for retrieving tree contents
1579 tree: SHA1 of the root tree, or None for an empty tree
1580 want_unchanged: Whether unchanged files should be reported
1581 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1582 (oldsha, newsha)
1583 """
1584 # TODO(jelmer): Support a include_trees option
1585 other_names = set(names)
1587 if tree is not None:
1588 for name, mode, sha in iter_tree_contents(object_store, tree):
1589 assert name is not None and mode is not None and sha is not None
1590 try:
1591 (other_sha, other_mode) = lookup_entry(name)
1592 except KeyError:
1593 # Was removed
1594 yield ((name, None), (mode, None), (sha, None))
1595 else:
1596 other_names.remove(name)
1597 if want_unchanged or other_sha != sha or other_mode != mode:
1598 yield ((name, name), (mode, other_mode), (sha, other_sha))
1600 # Mention added files
1601 for name in other_names:
1602 try:
1603 (other_sha, other_mode) = lookup_entry(name)
1604 except KeyError:
1605 pass
1606 else:
1607 yield ((None, name), (None, other_mode), (None, other_sha))
1610def index_entry_from_stat(
1611 stat_val: os.stat_result,
1612 hex_sha: bytes,
1613 mode: Optional[int] = None,
1614) -> IndexEntry:
1615 """Create a new index entry from a stat value.
1617 Args:
1618 stat_val: POSIX stat_result instance
1619 hex_sha: Hex sha of the object
1620 mode: Optional file mode, will be derived from stat if not provided
1621 """
1622 if mode is None:
1623 mode = cleanup_mode(stat_val.st_mode)
1625 return IndexEntry(
1626 ctime=stat_val.st_ctime,
1627 mtime=stat_val.st_mtime,
1628 dev=stat_val.st_dev,
1629 ino=stat_val.st_ino,
1630 mode=mode,
1631 uid=stat_val.st_uid,
1632 gid=stat_val.st_gid,
1633 size=stat_val.st_size,
1634 sha=hex_sha,
1635 flags=0,
1636 extended_flags=0,
1637 )
1640if sys.platform == "win32":
1641 # On Windows, creating symlinks either requires administrator privileges
1642 # or developer mode. Raise a more helpful error when we're unable to
1643 # create symlinks
1645 # https://github.com/jelmer/dulwich/issues/1005
1647 class WindowsSymlinkPermissionError(PermissionError):
1648 """Windows-specific error for symlink creation failures.
1650 This error is raised when symlink creation fails on Windows,
1651 typically due to lack of developer mode or administrator privileges.
1652 """
1654 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
1655 """Initialize WindowsSymlinkPermissionError."""
1656 super(PermissionError, self).__init__(
1657 errno,
1658 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1659 filename,
1660 )
1662 def symlink(
1663 src: Union[str, bytes],
1664 dst: Union[str, bytes],
1665 target_is_directory: bool = False,
1666 *,
1667 dir_fd: Optional[int] = None,
1668 ) -> None:
1669 """Create a symbolic link on Windows with better error handling.
1671 Args:
1672 src: Source path for the symlink
1673 dst: Destination path where symlink will be created
1674 target_is_directory: Whether the target is a directory
1675 dir_fd: Optional directory file descriptor
1677 Raises:
1678 WindowsSymlinkPermissionError: If symlink creation fails due to permissions
1679 """
1680 try:
1681 return os.symlink(
1682 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1683 )
1684 except PermissionError as e:
1685 raise WindowsSymlinkPermissionError(
1686 e.errno or 0, e.strerror or "", e.filename
1687 ) from e
1688else:
1689 symlink = os.symlink
1692def build_file_from_blob(
1693 blob: Blob,
1694 mode: int,
1695 target_path: bytes,
1696 *,
1697 honor_filemode: bool = True,
1698 tree_encoding: str = "utf-8",
1699 symlink_fn: Optional[
1700 Callable[
1701 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
1702 None,
1703 ]
1704 ] = None,
1705) -> os.stat_result:
1706 """Build a file or symlink on disk based on a Git object.
1708 Args:
1709 blob: The git object
1710 mode: File mode
1711 target_path: Path to write to
1712 honor_filemode: An optional flag to honor core.filemode setting in
1713 config file, default is core.filemode=True, change executable bit
1714 tree_encoding: Encoding to use for tree contents
1715 symlink_fn: Function to use for creating symlinks
1716 Returns: stat object for the file
1717 """
1718 try:
1719 oldstat = os.lstat(target_path)
1720 except FileNotFoundError:
1721 oldstat = None
1722 contents = blob.as_raw_string()
1723 if stat.S_ISLNK(mode):
1724 if oldstat:
1725 _remove_file_with_readonly_handling(target_path)
1726 if sys.platform == "win32":
1727 # os.readlink on Python3 on Windows requires a unicode string.
1728 contents_str = contents.decode(tree_encoding)
1729 target_path_str = target_path.decode(tree_encoding)
1730 (symlink_fn or symlink)(contents_str, target_path_str)
1731 else:
1732 (symlink_fn or symlink)(contents, target_path)
1733 else:
1734 if oldstat is not None and oldstat.st_size == len(contents):
1735 with open(target_path, "rb") as f:
1736 if f.read() == contents:
1737 return oldstat
1739 with open(target_path, "wb") as f:
1740 # Write out file
1741 f.write(contents)
1743 if honor_filemode:
1744 os.chmod(target_path, mode)
1746 return os.lstat(target_path)
1749INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1752def _normalize_path_element_default(element: bytes) -> bytes:
1753 """Normalize path element for default case-insensitive comparison."""
1754 return element.lower()
1757def _normalize_path_element_ntfs(element: bytes) -> bytes:
1758 """Normalize path element for NTFS filesystem."""
1759 return element.rstrip(b". ").lower()
1762def _normalize_path_element_hfs(element: bytes) -> bytes:
1763 """Normalize path element for HFS+ filesystem."""
1764 import unicodedata
1766 # Decode to Unicode (let UnicodeDecodeError bubble up)
1767 element_str = element.decode("utf-8", errors="strict")
1769 # Remove HFS+ ignorable characters
1770 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1771 # Normalize to NFD
1772 normalized = unicodedata.normalize("NFD", filtered)
1773 return normalized.lower().encode("utf-8", errors="strict")
1776def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
1777 """Get the appropriate path element normalization function based on config.
1779 Args:
1780 config: Repository configuration object
1782 Returns:
1783 Function that normalizes path elements for the configured filesystem
1784 """
1785 import os
1786 import sys
1788 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1789 return _normalize_path_element_ntfs
1790 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1791 return _normalize_path_element_hfs
1792 else:
1793 return _normalize_path_element_default
1796def validate_path_element_default(element: bytes) -> bool:
1797 """Validate a path element using default rules.
1799 Args:
1800 element: Path element to validate
1802 Returns:
1803 True if path element is valid, False otherwise
1804 """
1805 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1808def validate_path_element_ntfs(element: bytes) -> bool:
1809 """Validate a path element using NTFS filesystem rules.
1811 Args:
1812 element: Path element to validate
1814 Returns:
1815 True if path element is valid for NTFS, False otherwise
1816 """
1817 normalized = _normalize_path_element_ntfs(element)
1818 if normalized in INVALID_DOTNAMES:
1819 return False
1820 if normalized == b"git~1":
1821 return False
1822 return True
1825# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1826HFS_IGNORABLE_CHARS = {
1827 0x200C, # ZERO WIDTH NON-JOINER
1828 0x200D, # ZERO WIDTH JOINER
1829 0x200E, # LEFT-TO-RIGHT MARK
1830 0x200F, # RIGHT-TO-LEFT MARK
1831 0x202A, # LEFT-TO-RIGHT EMBEDDING
1832 0x202B, # RIGHT-TO-LEFT EMBEDDING
1833 0x202C, # POP DIRECTIONAL FORMATTING
1834 0x202D, # LEFT-TO-RIGHT OVERRIDE
1835 0x202E, # RIGHT-TO-LEFT OVERRIDE
1836 0x206A, # INHIBIT SYMMETRIC SWAPPING
1837 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1838 0x206C, # INHIBIT ARABIC FORM SHAPING
1839 0x206D, # ACTIVATE ARABIC FORM SHAPING
1840 0x206E, # NATIONAL DIGIT SHAPES
1841 0x206F, # NOMINAL DIGIT SHAPES
1842 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1843}
1846def validate_path_element_hfs(element: bytes) -> bool:
1847 """Validate path element for HFS+ filesystem.
1849 Equivalent to Git's is_hfs_dotgit and related checks.
1850 Uses NFD normalization and ignores HFS+ ignorable characters.
1851 """
1852 try:
1853 normalized = _normalize_path_element_hfs(element)
1854 except UnicodeDecodeError:
1855 # Malformed UTF-8 - be conservative and reject
1856 return False
1858 # Check against invalid names
1859 if normalized in INVALID_DOTNAMES:
1860 return False
1862 # Also check for 8.3 short name
1863 if normalized == b"git~1":
1864 return False
1866 return True
1869def validate_path(
1870 path: bytes,
1871 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1872) -> bool:
1873 """Default path validator that just checks for .git/."""
1874 parts = path.split(b"/")
1875 for p in parts:
1876 if not element_validator(p):
1877 return False
1878 else:
1879 return True
1882def build_index_from_tree(
1883 root_path: Union[str, bytes],
1884 index_path: Union[str, bytes],
1885 object_store: ObjectContainer,
1886 tree_id: bytes,
1887 honor_filemode: bool = True,
1888 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1889 symlink_fn: Optional[
1890 Callable[
1891 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
1892 None,
1893 ]
1894 ] = None,
1895 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
1896 tree_encoding: str = "utf-8",
1897) -> None:
1898 """Generate and materialize index from a tree.
1900 Args:
1901 tree_id: Tree to materialize
1902 root_path: Target dir for materialized index files
1903 index_path: Target path for generated index
1904 object_store: Non-empty object store holding tree contents
1905 honor_filemode: An optional flag to honor core.filemode setting in
1906 config file, default is core.filemode=True, change executable bit
1907 validate_path_element: Function to validate path elements to check
1908 out; default just refuses .git and .. directories.
1909 symlink_fn: Function to use for creating symlinks
1910 blob_normalizer: An optional BlobNormalizer to use for converting line
1911 endings when writing blobs to the working directory.
1912 tree_encoding: Encoding used for tree paths (default: utf-8)
1914 Note: existing index is wiped and contents are not merged
1915 in a working dir. Suitable only for fresh clones.
1916 """
1917 index = Index(index_path, read=False)
1918 if not isinstance(root_path, bytes):
1919 root_path = os.fsencode(root_path)
1921 for entry in iter_tree_contents(object_store, tree_id):
1922 assert (
1923 entry.path is not None and entry.mode is not None and entry.sha is not None
1924 )
1925 if not validate_path(entry.path, validate_path_element):
1926 continue
1927 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
1929 if not os.path.exists(os.path.dirname(full_path)):
1930 os.makedirs(os.path.dirname(full_path))
1932 # TODO(jelmer): Merge new index into working tree
1933 if S_ISGITLINK(entry.mode):
1934 if not os.path.isdir(full_path):
1935 os.mkdir(full_path)
1936 st = os.lstat(full_path)
1937 # TODO(jelmer): record and return submodule paths
1938 else:
1939 obj = object_store[entry.sha]
1940 assert isinstance(obj, Blob)
1941 # Apply blob normalization for checkout if normalizer is provided
1942 if blob_normalizer is not None:
1943 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1944 st = build_file_from_blob(
1945 obj,
1946 entry.mode,
1947 full_path,
1948 honor_filemode=honor_filemode,
1949 tree_encoding=tree_encoding,
1950 symlink_fn=symlink_fn,
1951 )
1953 # Add file to index
1954 if not honor_filemode or S_ISGITLINK(entry.mode):
1955 # we can not use tuple slicing to build a new tuple,
1956 # because on windows that will convert the times to
1957 # longs, which causes errors further along
1958 st_tuple = (
1959 entry.mode,
1960 st.st_ino,
1961 st.st_dev,
1962 st.st_nlink,
1963 st.st_uid,
1964 st.st_gid,
1965 st.st_size,
1966 st.st_atime,
1967 st.st_mtime,
1968 st.st_ctime,
1969 )
1970 st = st.__class__(st_tuple)
1971 # default to a stage 0 index entry (normal)
1972 # when reading from the filesystem
1973 index[entry.path] = index_entry_from_stat(st, entry.sha)
1975 index.write()
1978def blob_from_path_and_mode(
1979 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1980) -> Blob:
1981 """Create a blob from a path and a stat object.
1983 Args:
1984 fs_path: Full file system path to file
1985 mode: File mode
1986 tree_encoding: Encoding to use for tree contents
1987 Returns: A `Blob` object
1988 """
1989 assert isinstance(fs_path, bytes)
1990 blob = Blob()
1991 if stat.S_ISLNK(mode):
1992 if sys.platform == "win32":
1993 # os.readlink on Python3 on Windows requires a unicode string.
1994 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1995 else:
1996 blob.data = os.readlink(fs_path)
1997 else:
1998 with open(fs_path, "rb") as f:
1999 blob.data = f.read()
2000 return blob
2003def blob_from_path_and_stat(
2004 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
2005) -> Blob:
2006 """Create a blob from a path and a stat object.
2008 Args:
2009 fs_path: Full file system path to file
2010 st: A stat object
2011 tree_encoding: Encoding to use for tree contents
2012 Returns: A `Blob` object
2013 """
2014 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
2017def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
2018 """Read the head commit of a submodule.
2020 Args:
2021 path: path to the submodule
2022 Returns: HEAD sha, None if not a valid head/repository
2023 """
2024 from .errors import NotGitRepository
2025 from .repo import Repo
2027 # Repo currently expects a "str", so decode if necessary.
2028 # TODO(jelmer): Perhaps move this into Repo() ?
2029 if not isinstance(path, str):
2030 path = os.fsdecode(path)
2031 try:
2032 repo = Repo(path)
2033 except NotGitRepository:
2034 return None
2035 try:
2036 return repo.head()
2037 except KeyError:
2038 return None
2041def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
2042 """Check if a directory has changed after getting an error.
2044 When handling an error trying to create a blob from a path, call this
2045 function. It will check if the path is a directory. If it's a directory
2046 and a submodule, check the submodule head to see if it's has changed. If
2047 not, consider the file as changed as Git tracked a file and not a
2048 directory.
2050 Return true if the given path should be considered as changed and False
2051 otherwise or if the path is not a directory.
2052 """
2053 # This is actually a directory
2054 if os.path.exists(os.path.join(tree_path, b".git")):
2055 # Submodule
2056 head = read_submodule_head(tree_path)
2057 if entry.sha != head:
2058 return True
2059 else:
2060 # The file was changed to a directory, so consider it removed.
2061 return True
2063 return False
2066os_sep_bytes = os.sep.encode("ascii")
2069def _ensure_parent_dir_exists(full_path: bytes) -> None:
2070 """Ensure parent directory exists, checking no parent is a file."""
2071 parent_dir = os.path.dirname(full_path)
2072 if parent_dir and not os.path.exists(parent_dir):
2073 # Walk up the directory tree to find the first existing parent
2074 current = parent_dir
2075 parents_to_check: list[bytes] = []
2077 while current and not os.path.exists(current):
2078 parents_to_check.insert(0, current)
2079 new_parent = os.path.dirname(current)
2080 if new_parent == current:
2081 # Reached the root or can't go up further
2082 break
2083 current = new_parent
2085 # Check if the existing parent (if any) is a directory
2086 if current and os.path.exists(current) and not os.path.isdir(current):
2087 raise OSError(
2088 f"Cannot create directory, parent path is a file: {current!r}"
2089 )
2091 # Now check each parent we need to create isn't blocked by an existing file
2092 for parent_path in parents_to_check:
2093 if os.path.exists(parent_path) and not os.path.isdir(parent_path):
2094 raise OSError(
2095 f"Cannot create directory, parent path is a file: {parent_path!r}"
2096 )
2098 os.makedirs(parent_dir)
2101def _remove_file_with_readonly_handling(path: bytes) -> None:
2102 """Remove a file, handling read-only files on Windows.
2104 Args:
2105 path: Path to the file to remove
2106 """
2107 try:
2108 os.unlink(path)
2109 except PermissionError:
2110 # On Windows, remove read-only attribute and retry
2111 if sys.platform == "win32":
2112 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
2113 os.unlink(path)
2114 else:
2115 raise
2118def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
2119 """Remove empty parent directories up to stop_at."""
2120 parent = os.path.dirname(path)
2121 while parent and parent != stop_at:
2122 try:
2123 os.rmdir(parent)
2124 parent = os.path.dirname(parent)
2125 except FileNotFoundError:
2126 # Directory doesn't exist - stop trying
2127 break
2128 except OSError as e:
2129 if e.errno == errno.ENOTEMPTY:
2130 # Directory not empty - stop trying
2131 break
2132 raise
2135def _check_symlink_matches(
2136 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes
2137) -> bool:
2138 """Check if symlink target matches expected target.
2140 Returns True if symlink matches, False if it doesn't match.
2141 """
2142 try:
2143 current_target = os.readlink(full_path)
2144 blob_obj = repo_object_store[entry_sha]
2145 expected_target = blob_obj.as_raw_string()
2146 if isinstance(current_target, str):
2147 current_target = current_target.encode()
2148 return current_target == expected_target
2149 except FileNotFoundError:
2150 # Symlink doesn't exist
2151 return False
2152 except OSError as e:
2153 if e.errno == errno.EINVAL:
2154 # Not a symlink
2155 return False
2156 raise
2159def _check_file_matches(
2160 repo_object_store: "BaseObjectStore",
2161 full_path: bytes,
2162 entry_sha: bytes,
2163 entry_mode: int,
2164 current_stat: os.stat_result,
2165 honor_filemode: bool,
2166 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
2167 tree_path: Optional[bytes] = None,
2168) -> bool:
2169 """Check if a file on disk matches the expected git object.
2171 Returns True if file matches, False if it doesn't match.
2172 """
2173 # Check mode first (if honor_filemode is True)
2174 if honor_filemode:
2175 current_mode = stat.S_IMODE(current_stat.st_mode)
2176 expected_mode = stat.S_IMODE(entry_mode)
2178 # For regular files, only check the user executable bit, not group/other permissions
2179 # This matches Git's behavior where umask differences don't count as modifications
2180 if stat.S_ISREG(current_stat.st_mode):
2181 # Normalize regular file modes to ignore group/other write permissions
2182 current_mode_normalized = (
2183 current_mode & 0o755
2184 ) # Keep only user rwx and all read+execute
2185 expected_mode_normalized = expected_mode & 0o755
2187 # For Git compatibility, regular files should be either 644 or 755
2188 if expected_mode_normalized not in (0o644, 0o755):
2189 expected_mode_normalized = 0o644 # Default for regular files
2190 if current_mode_normalized not in (0o644, 0o755):
2191 # Determine if it should be executable based on user execute bit
2192 if current_mode & 0o100: # User execute bit is set
2193 current_mode_normalized = 0o755
2194 else:
2195 current_mode_normalized = 0o644
2197 if current_mode_normalized != expected_mode_normalized:
2198 return False
2199 else:
2200 # For non-regular files (symlinks, etc.), check mode exactly
2201 if current_mode != expected_mode:
2202 return False
2204 # If mode matches (or we don't care), check content via size first
2205 blob_obj = repo_object_store[entry_sha]
2206 if current_stat.st_size != blob_obj.raw_length():
2207 return False
2209 # Size matches, check actual content
2210 try:
2211 with open(full_path, "rb") as f:
2212 current_content = f.read()
2213 expected_content = blob_obj.as_raw_string()
2214 if blob_normalizer and tree_path is not None:
2215 assert isinstance(blob_obj, Blob)
2216 normalized_blob = blob_normalizer.checkout_normalize(
2217 blob_obj, tree_path
2218 )
2219 expected_content = normalized_blob.as_raw_string()
2220 return current_content == expected_content
2221 except (FileNotFoundError, PermissionError, IsADirectoryError):
2222 return False
2225def _transition_to_submodule(
2226 repo: "Repo",
2227 path: bytes,
2228 full_path: bytes,
2229 current_stat: Optional[os.stat_result],
2230 entry: Union[IndexEntry, TreeEntry],
2231 index: Index,
2232) -> None:
2233 """Transition any type to submodule."""
2234 from .submodule import ensure_submodule_placeholder
2236 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2237 # Already a directory, just ensure .git file exists
2238 ensure_submodule_placeholder(repo, path)
2239 else:
2240 # Remove whatever is there and create submodule
2241 if current_stat is not None:
2242 _remove_file_with_readonly_handling(full_path)
2243 ensure_submodule_placeholder(repo, path)
2245 st = os.lstat(full_path)
2246 assert entry.sha is not None
2247 index[path] = index_entry_from_stat(st, entry.sha)
2250def _transition_to_file(
2251 object_store: "BaseObjectStore",
2252 path: bytes,
2253 full_path: bytes,
2254 current_stat: Optional[os.stat_result],
2255 entry: Union[IndexEntry, TreeEntry],
2256 index: Index,
2257 honor_filemode: bool,
2258 symlink_fn: Optional[
2259 Callable[
2260 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
2261 None,
2262 ]
2263 ],
2264 blob_normalizer: Optional["FilterBlobNormalizer"],
2265 tree_encoding: str = "utf-8",
2266) -> None:
2267 """Transition any type to regular file or symlink."""
2268 assert entry.sha is not None and entry.mode is not None
2269 # Check if we need to update
2270 if (
2271 current_stat is not None
2272 and stat.S_ISREG(current_stat.st_mode)
2273 and not stat.S_ISLNK(entry.mode)
2274 ):
2275 # File to file - check if update needed
2276 file_matches = _check_file_matches(
2277 object_store,
2278 full_path,
2279 entry.sha,
2280 entry.mode,
2281 current_stat,
2282 honor_filemode,
2283 blob_normalizer,
2284 path,
2285 )
2286 needs_update = not file_matches
2287 elif (
2288 current_stat is not None
2289 and stat.S_ISLNK(current_stat.st_mode)
2290 and stat.S_ISLNK(entry.mode)
2291 ):
2292 # Symlink to symlink - check if update needed
2293 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
2294 needs_update = not symlink_matches
2295 else:
2296 needs_update = True
2298 if not needs_update:
2299 # Just update index - current_stat should always be valid here since we're not updating
2300 assert current_stat is not None
2301 index[path] = index_entry_from_stat(current_stat, entry.sha)
2302 return
2304 # Remove existing entry if needed
2305 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2306 # Remove directory
2307 dir_contents = set(os.listdir(full_path))
2308 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2310 if git_file_name in dir_contents:
2311 if dir_contents != {git_file_name}:
2312 raise IsADirectoryError(
2313 f"Cannot replace submodule with untracked files: {full_path!r}"
2314 )
2315 shutil.rmtree(full_path)
2316 else:
2317 try:
2318 os.rmdir(full_path)
2319 except OSError as e:
2320 if e.errno == errno.ENOTEMPTY:
2321 raise IsADirectoryError(
2322 f"Cannot replace non-empty directory with file: {full_path!r}"
2323 )
2324 raise
2325 elif current_stat is not None:
2326 _remove_file_with_readonly_handling(full_path)
2328 # Ensure parent directory exists
2329 _ensure_parent_dir_exists(full_path)
2331 # Write the file
2332 blob_obj = object_store[entry.sha]
2333 assert isinstance(blob_obj, Blob)
2334 if blob_normalizer:
2335 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
2336 st = build_file_from_blob(
2337 blob_obj,
2338 entry.mode,
2339 full_path,
2340 honor_filemode=honor_filemode,
2341 tree_encoding=tree_encoding,
2342 symlink_fn=symlink_fn,
2343 )
2344 index[path] = index_entry_from_stat(st, entry.sha)
2347def _transition_to_absent(
2348 repo: "Repo",
2349 path: bytes,
2350 full_path: bytes,
2351 current_stat: Optional[os.stat_result],
2352 index: Index,
2353) -> None:
2354 """Remove any type of entry."""
2355 if current_stat is None:
2356 return
2358 if stat.S_ISDIR(current_stat.st_mode):
2359 # Check if it's a submodule directory
2360 dir_contents = set(os.listdir(full_path))
2361 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2363 if git_file_name in dir_contents and dir_contents == {git_file_name}:
2364 shutil.rmtree(full_path)
2365 else:
2366 try:
2367 os.rmdir(full_path)
2368 except OSError as e:
2369 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
2370 raise
2371 else:
2372 _remove_file_with_readonly_handling(full_path)
2374 try:
2375 del index[path]
2376 except KeyError:
2377 pass
2379 # Try to remove empty parent directories
2380 _remove_empty_parents(
2381 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2382 )
2385def detect_case_only_renames(
2386 changes: Sequence["TreeChange"],
2387 config: "Config",
2388) -> list["TreeChange"]:
2389 """Detect and transform case-only renames in a list of tree changes.
2391 This function identifies file renames that only differ in case (e.g.,
2392 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
2393 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
2394 based on the repository configuration.
2396 Args:
2397 changes: List of TreeChange objects representing file changes
2398 config: Repository configuration object
2400 Returns:
2401 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
2402 """
2403 from .diff_tree import (
2404 CHANGE_ADD,
2405 CHANGE_COPY,
2406 CHANGE_DELETE,
2407 CHANGE_MODIFY,
2408 CHANGE_RENAME,
2409 TreeChange,
2410 )
2412 # Build dictionaries of old and new paths with their normalized forms
2413 old_paths_normalized = {}
2414 new_paths_normalized = {}
2415 old_changes = {} # Map from old path to change object
2416 new_changes = {} # Map from new path to change object
2418 # Get the appropriate normalizer based on config
2419 normalize_func = get_path_element_normalizer(config)
2421 def normalize_path(path: bytes) -> bytes:
2422 """Normalize entire path using element normalization."""
2423 return b"/".join(normalize_func(part) for part in path.split(b"/"))
2425 # Pre-normalize all paths once to avoid repeated normalization
2426 for change in changes:
2427 if change.type == CHANGE_DELETE and change.old:
2428 assert change.old.path is not None
2429 try:
2430 normalized = normalize_path(change.old.path)
2431 except UnicodeDecodeError:
2432 import logging
2434 logging.warning(
2435 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2436 change.old.path,
2437 )
2438 else:
2439 old_paths_normalized[normalized] = change.old.path
2440 old_changes[change.old.path] = change
2441 elif change.type == CHANGE_RENAME and change.old:
2442 assert change.old.path is not None
2443 # Treat RENAME as DELETE + ADD for case-only detection
2444 try:
2445 normalized = normalize_path(change.old.path)
2446 except UnicodeDecodeError:
2447 import logging
2449 logging.warning(
2450 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2451 change.old.path,
2452 )
2453 else:
2454 old_paths_normalized[normalized] = change.old.path
2455 old_changes[change.old.path] = change
2457 if (
2458 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
2459 and change.new
2460 ):
2461 assert change.new.path is not None
2462 try:
2463 normalized = normalize_path(change.new.path)
2464 except UnicodeDecodeError:
2465 import logging
2467 logging.warning(
2468 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2469 change.new.path,
2470 )
2471 else:
2472 new_paths_normalized[normalized] = change.new.path
2473 new_changes[change.new.path] = change
2475 # Find case-only renames and transform changes
2476 case_only_renames = set()
2477 new_rename_changes = []
2479 for norm_path, old_path in old_paths_normalized.items():
2480 if norm_path in new_paths_normalized:
2481 new_path = new_paths_normalized[norm_path]
2482 if old_path != new_path:
2483 # Found a case-only rename
2484 old_change = old_changes[old_path]
2485 new_change = new_changes[new_path]
2487 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
2488 if new_change.type == CHANGE_ADD:
2489 # Simple case: DELETE + ADD becomes RENAME
2490 rename_change = TreeChange(
2491 CHANGE_RENAME, old_change.old, new_change.new
2492 )
2493 else:
2494 # Complex case: DELETE + MODIFY becomes RENAME
2495 # Use the old file from DELETE and new file from MODIFY
2496 rename_change = TreeChange(
2497 CHANGE_RENAME, old_change.old, new_change.new
2498 )
2500 new_rename_changes.append(rename_change)
2502 # Mark the old changes for removal
2503 case_only_renames.add(old_change)
2504 case_only_renames.add(new_change)
2506 # Return new list with original ADD/DELETE changes replaced by renames
2507 result = [change for change in changes if change not in case_only_renames]
2508 result.extend(new_rename_changes)
2509 return result
2512def update_working_tree(
2513 repo: "Repo",
2514 old_tree_id: Optional[bytes],
2515 new_tree_id: bytes,
2516 change_iterator: Iterator["TreeChange"],
2517 honor_filemode: bool = True,
2518 validate_path_element: Optional[Callable[[bytes], bool]] = None,
2519 symlink_fn: Optional[
2520 Callable[
2521 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
2522 None,
2523 ]
2524 ] = None,
2525 force_remove_untracked: bool = False,
2526 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
2527 tree_encoding: str = "utf-8",
2528 allow_overwrite_modified: bool = False,
2529) -> None:
2530 """Update the working tree and index to match a new tree.
2532 This function handles:
2533 - Adding new files
2534 - Updating modified files
2535 - Removing deleted files
2536 - Cleaning up empty directories
2538 Args:
2539 repo: Repository object
2540 old_tree_id: SHA of the tree before the update
2541 new_tree_id: SHA of the tree to update to
2542 change_iterator: Iterator of TreeChange objects to apply
2543 honor_filemode: An optional flag to honor core.filemode setting
2544 validate_path_element: Function to validate path elements to check out
2545 symlink_fn: Function to use for creating symlinks
2546 force_remove_untracked: If True, remove files that exist in working
2547 directory but not in target tree, even if old_tree_id is None
2548 blob_normalizer: An optional BlobNormalizer to use for converting line
2549 endings when writing blobs to the working directory.
2550 tree_encoding: Encoding used for tree paths (default: utf-8)
2551 allow_overwrite_modified: If False, raise an error when attempting to
2552 overwrite files that have been modified compared to old_tree_id
2553 """
2554 if validate_path_element is None:
2555 validate_path_element = validate_path_element_default
2557 from .diff_tree import (
2558 CHANGE_ADD,
2559 CHANGE_COPY,
2560 CHANGE_DELETE,
2561 CHANGE_MODIFY,
2562 CHANGE_RENAME,
2563 CHANGE_UNCHANGED,
2564 )
2566 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2567 index = repo.open_index()
2569 # Convert iterator to list since we need multiple passes
2570 changes = list(change_iterator)
2572 # Transform case-only renames on case-insensitive filesystems
2573 import platform
2575 default_ignore_case = platform.system() in ("Windows", "Darwin")
2576 config = repo.get_config()
2577 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2579 if ignore_case:
2580 config = repo.get_config()
2581 changes = detect_case_only_renames(changes, config)
2583 # Check for path conflicts where files need to become directories
2584 paths_becoming_dirs = set()
2585 for change in changes:
2586 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2587 assert change.new is not None
2588 path = change.new.path
2589 assert path is not None
2590 if b"/" in path: # This is a file inside a directory
2591 # Check if any parent path exists as a file in the old tree or changes
2592 parts = path.split(b"/")
2593 for i in range(1, len(parts)):
2594 parent = b"/".join(parts[:i])
2595 # See if this parent path is being deleted (was a file, becoming a dir)
2596 for other_change in changes:
2597 if (
2598 other_change.type == CHANGE_DELETE
2599 and other_change.old
2600 and other_change.old.path == parent
2601 ):
2602 paths_becoming_dirs.add(parent)
2604 # Check if any path that needs to become a directory has been modified
2605 for path in paths_becoming_dirs:
2606 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2607 try:
2608 current_stat = os.lstat(full_path)
2609 except FileNotFoundError:
2610 continue # File doesn't exist, nothing to check
2611 except OSError as e:
2612 raise OSError(
2613 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2614 ) from e
2616 if stat.S_ISREG(current_stat.st_mode):
2617 # Find the old entry for this path
2618 old_change = None
2619 for change in changes:
2620 if (
2621 change.type == CHANGE_DELETE
2622 and change.old
2623 and change.old.path == path
2624 ):
2625 old_change = change
2626 break
2628 if old_change:
2629 # Check if file has been modified
2630 assert old_change.old is not None
2631 assert (
2632 old_change.old.sha is not None and old_change.old.mode is not None
2633 )
2634 file_matches = _check_file_matches(
2635 repo.object_store,
2636 full_path,
2637 old_change.old.sha,
2638 old_change.old.mode,
2639 current_stat,
2640 honor_filemode,
2641 blob_normalizer,
2642 path,
2643 )
2644 if not file_matches:
2645 raise OSError(
2646 f"Cannot replace modified file with directory: {path!r}"
2647 )
2649 # Check for uncommitted modifications before making any changes
2650 if not allow_overwrite_modified and old_tree_id:
2651 for change in changes:
2652 # Only check files that are being modified or deleted
2653 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2654 path = change.old.path
2655 assert path is not None
2656 if path.startswith(b".git") or not validate_path(
2657 path, validate_path_element
2658 ):
2659 continue
2661 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2662 try:
2663 current_stat = os.lstat(full_path)
2664 except FileNotFoundError:
2665 continue # File doesn't exist, nothing to check
2666 except OSError as e:
2667 raise OSError(
2668 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2669 ) from e
2671 if stat.S_ISREG(current_stat.st_mode):
2672 # Check if working tree file differs from old tree
2673 assert change.old.sha is not None and change.old.mode is not None
2674 file_matches = _check_file_matches(
2675 repo.object_store,
2676 full_path,
2677 change.old.sha,
2678 change.old.mode,
2679 current_stat,
2680 honor_filemode,
2681 blob_normalizer,
2682 path,
2683 )
2684 if not file_matches:
2685 from .errors import WorkingTreeModifiedError
2687 raise WorkingTreeModifiedError(
2688 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2689 f"would be overwritten by checkout. "
2690 f"Please commit your changes or stash them before you switch branches."
2691 )
2693 # Apply the changes
2694 for change in changes:
2695 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2696 # Remove file/directory
2697 assert change.old is not None and change.old.path is not None
2698 path = change.old.path
2699 if path.startswith(b".git") or not validate_path(
2700 path, validate_path_element
2701 ):
2702 continue
2704 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2705 try:
2706 delete_stat: Optional[os.stat_result] = os.lstat(full_path)
2707 except FileNotFoundError:
2708 delete_stat = None
2709 except OSError as e:
2710 raise OSError(
2711 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2712 ) from e
2714 _transition_to_absent(repo, path, full_path, delete_stat, index)
2716 if change.type in (
2717 CHANGE_ADD,
2718 CHANGE_MODIFY,
2719 CHANGE_UNCHANGED,
2720 CHANGE_COPY,
2721 CHANGE_RENAME,
2722 ):
2723 # Add or modify file
2724 assert (
2725 change.new is not None
2726 and change.new.path is not None
2727 and change.new.mode is not None
2728 )
2729 path = change.new.path
2730 if path.startswith(b".git") or not validate_path(
2731 path, validate_path_element
2732 ):
2733 continue
2735 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2736 try:
2737 modify_stat: Optional[os.stat_result] = os.lstat(full_path)
2738 except FileNotFoundError:
2739 modify_stat = None
2740 except OSError as e:
2741 raise OSError(
2742 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2743 ) from e
2745 if S_ISGITLINK(change.new.mode):
2746 _transition_to_submodule(
2747 repo, path, full_path, modify_stat, change.new, index
2748 )
2749 else:
2750 _transition_to_file(
2751 repo.object_store,
2752 path,
2753 full_path,
2754 modify_stat,
2755 change.new,
2756 index,
2757 honor_filemode,
2758 symlink_fn,
2759 blob_normalizer,
2760 tree_encoding,
2761 )
2763 index.write()
2766def _check_entry_for_changes(
2767 tree_path: bytes,
2768 entry: Union[IndexEntry, ConflictedIndexEntry],
2769 root_path: bytes,
2770 filter_blob_callback: Optional[Callable[[bytes, bytes], bytes]] = None,
2771) -> Optional[bytes]:
2772 """Check a single index entry for changes.
2774 Args:
2775 tree_path: Path in the tree
2776 entry: Index entry to check
2777 root_path: Root filesystem path
2778 filter_blob_callback: Optional callback to filter blobs
2779 Returns: tree_path if changed, None otherwise
2780 """
2781 if isinstance(entry, ConflictedIndexEntry):
2782 # Conflicted files are always unstaged
2783 return tree_path
2785 full_path = _tree_to_fs_path(root_path, tree_path)
2786 try:
2787 st = os.lstat(full_path)
2788 if stat.S_ISDIR(st.st_mode):
2789 if _has_directory_changed(tree_path, entry):
2790 return tree_path
2791 return None
2793 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2794 return None
2796 blob = blob_from_path_and_stat(full_path, st)
2798 if filter_blob_callback is not None:
2799 blob.data = filter_blob_callback(blob.data, tree_path)
2800 except FileNotFoundError:
2801 # The file was removed, so we assume that counts as
2802 # different from whatever file used to exist.
2803 return tree_path
2804 else:
2805 if blob.id != entry.sha:
2806 return tree_path
2807 return None
2810def get_unstaged_changes(
2811 index: Index,
2812 root_path: Union[str, bytes],
2813 filter_blob_callback: Optional[Callable[..., Any]] = None,
2814 preload_index: bool = False,
2815) -> Generator[bytes, None, None]:
2816 """Walk through an index and check for differences against working tree.
2818 Args:
2819 index: index to check
2820 root_path: path in which to find files
2821 filter_blob_callback: Optional callback to filter blobs
2822 preload_index: If True, use parallel threads to check files (requires threading support)
2823 Returns: iterator over paths with unstaged changes
2824 """
2825 # For each entry in the index check the sha1 & ensure not staged
2826 if not isinstance(root_path, bytes):
2827 root_path = os.fsencode(root_path)
2829 if preload_index:
2830 # Use parallel processing for better performance on slow filesystems
2831 try:
2832 import multiprocessing
2833 from concurrent.futures import ThreadPoolExecutor
2834 except ImportError:
2835 # If threading is not available, fall back to serial processing
2836 preload_index = False
2837 else:
2838 # Collect all entries first
2839 entries = list(index.iteritems())
2841 # Use number of CPUs but cap at 8 threads to avoid overhead
2842 num_workers = min(multiprocessing.cpu_count(), 8)
2844 # Process entries in parallel
2845 with ThreadPoolExecutor(max_workers=num_workers) as executor:
2846 # Submit all tasks
2847 futures = [
2848 executor.submit(
2849 _check_entry_for_changes,
2850 tree_path,
2851 entry,
2852 root_path,
2853 filter_blob_callback,
2854 )
2855 for tree_path, entry in entries
2856 ]
2858 # Yield results as they complete
2859 for future in futures:
2860 result = future.result()
2861 if result is not None:
2862 yield result
2864 if not preload_index:
2865 # Serial processing
2866 for tree_path, entry in index.iteritems():
2867 result = _check_entry_for_changes(
2868 tree_path, entry, root_path, filter_blob_callback
2869 )
2870 if result is not None:
2871 yield result
2874def _tree_to_fs_path(
2875 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
2876) -> bytes:
2877 """Convert a git tree path to a file system path.
2879 Args:
2880 root_path: Root filesystem path
2881 tree_path: Git tree path as bytes (encoded with tree_encoding)
2882 tree_encoding: Encoding used for tree paths (default: utf-8)
2884 Returns: File system path.
2885 """
2886 assert isinstance(tree_path, bytes)
2887 if os_sep_bytes != b"/":
2888 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
2889 else:
2890 sep_corrected_path = tree_path
2892 # On Windows, we need to handle tree path encoding properly
2893 if sys.platform == "win32":
2894 # Decode from tree encoding, then re-encode for filesystem
2895 try:
2896 tree_path_str = sep_corrected_path.decode(tree_encoding)
2897 sep_corrected_path = os.fsencode(tree_path_str)
2898 except UnicodeDecodeError:
2899 # If decoding fails, use the original bytes
2900 pass
2902 return os.path.join(root_path, sep_corrected_path)
2905def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes:
2906 """Convert a file system path to a git tree path.
2908 Args:
2909 fs_path: File system path.
2910 tree_encoding: Encoding to use for tree paths (default: utf-8)
2912 Returns: Git tree path as bytes (encoded with tree_encoding)
2913 """
2914 if not isinstance(fs_path, bytes):
2915 fs_path_bytes = os.fsencode(fs_path)
2916 else:
2917 fs_path_bytes = fs_path
2919 # On Windows, we need to ensure tree paths are properly encoded
2920 if sys.platform == "win32":
2921 try:
2922 # Decode from filesystem encoding, then re-encode with tree encoding
2923 fs_path_str = os.fsdecode(fs_path_bytes)
2924 fs_path_bytes = fs_path_str.encode(tree_encoding)
2925 except UnicodeDecodeError:
2926 # If filesystem decoding fails, use the original bytes
2927 pass
2929 if os_sep_bytes != b"/":
2930 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2931 else:
2932 tree_path = fs_path_bytes
2933 return tree_path
2936def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
2937 """Create an index entry for a directory.
2939 This is only used for submodules (directories containing .git).
2941 Args:
2942 st: Stat result for the directory
2943 path: Path to the directory
2945 Returns:
2946 IndexEntry for a submodule, or None if not a submodule
2947 """
2948 if os.path.exists(os.path.join(path, b".git")):
2949 head = read_submodule_head(path)
2950 if head is None:
2951 return None
2952 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2953 return None
2956def index_entry_from_path(
2957 path: bytes, object_store: Optional[ObjectContainer] = None
2958) -> Optional[IndexEntry]:
2959 """Create an index from a filesystem path.
2961 This returns an index value for files, symlinks
2962 and tree references. for directories and
2963 non-existent files it returns None
2965 Args:
2966 path: Path to create an index entry for
2967 object_store: Optional object store to
2968 save new blobs in
2969 Returns: An index entry; None for directories
2970 """
2971 assert isinstance(path, bytes)
2972 st = os.lstat(path)
2973 if stat.S_ISDIR(st.st_mode):
2974 return index_entry_from_directory(st, path)
2976 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
2977 blob = blob_from_path_and_stat(path, st)
2978 if object_store is not None:
2979 object_store.add_object(blob)
2980 return index_entry_from_stat(st, blob.id)
2982 return None
2985def iter_fresh_entries(
2986 paths: Iterable[bytes],
2987 root_path: bytes,
2988 object_store: Optional[ObjectContainer] = None,
2989) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
2990 """Iterate over current versions of index entries on disk.
2992 Args:
2993 paths: Paths to iterate over
2994 root_path: Root path to access from
2995 object_store: Optional store to save new blobs in
2996 Returns: Iterator over path, index_entry
2997 """
2998 for path in paths:
2999 p = _tree_to_fs_path(root_path, path)
3000 try:
3001 entry = index_entry_from_path(p, object_store=object_store)
3002 except (FileNotFoundError, IsADirectoryError):
3003 entry = None
3004 yield path, entry
3007def iter_fresh_objects(
3008 paths: Iterable[bytes],
3009 root_path: bytes,
3010 include_deleted: bool = False,
3011 object_store: Optional[ObjectContainer] = None,
3012) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
3013 """Iterate over versions of objects on disk referenced by index.
3015 Args:
3016 paths: Paths to check
3017 root_path: Root path to access from
3018 include_deleted: Include deleted entries with sha and
3019 mode set to None
3020 object_store: Optional object store to report new items to
3021 Returns: Iterator over path, sha, mode
3022 """
3023 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
3024 if entry is None:
3025 if include_deleted:
3026 yield path, None, None
3027 else:
3028 yield path, entry.sha, cleanup_mode(entry.mode)
3031def refresh_index(index: Index, root_path: bytes) -> None:
3032 """Refresh the contents of an index.
3034 This is the equivalent to running 'git commit -a'.
3036 Args:
3037 index: Index to update
3038 root_path: Root filesystem path
3039 """
3040 for path, entry in iter_fresh_entries(index, root_path):
3041 if entry:
3042 index[path] = entry
3045class locked_index:
3046 """Lock the index while making modifications.
3048 Works as a context manager.
3049 """
3051 _file: "_GitFile"
3053 def __init__(self, path: Union[bytes, str]) -> None:
3054 """Initialize locked_index."""
3055 self._path = path
3057 def __enter__(self) -> Index:
3058 """Enter context manager and lock index."""
3059 f = GitFile(self._path, "wb")
3060 self._file = f
3061 self._index = Index(self._path)
3062 return self._index
3064 def __exit__(
3065 self,
3066 exc_type: Optional[type],
3067 exc_value: Optional[BaseException],
3068 traceback: Optional[types.TracebackType],
3069 ) -> None:
3070 """Exit context manager and unlock index."""
3071 if exc_type is not None:
3072 self._file.abort()
3073 return
3074 try:
3075 f = SHA1Writer(self._file)
3076 write_index_dict(f, self._index._byname)
3077 except BaseException:
3078 self._file.abort()
3079 else:
3080 f.close()