Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 30%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24__all__ = [
25 "DEFAULT_VERSION",
26 "EOIE_EXTENSION",
27 "EXTENDED_FLAG_INTEND_TO_ADD",
28 "EXTENDED_FLAG_SKIP_WORKTREE",
29 "FLAG_EXTENDED",
30 "FLAG_NAMEMASK",
31 "FLAG_STAGEMASK",
32 "FLAG_STAGESHIFT",
33 "FLAG_VALID",
34 "HFS_IGNORABLE_CHARS",
35 "IEOT_EXTENSION",
36 "INVALID_DOTNAMES",
37 "REUC_EXTENSION",
38 "SDIR_EXTENSION",
39 "TREE_EXTENSION",
40 "UNTR_EXTENSION",
41 "Index",
42 "IndexEntry",
43 "IndexExtension",
44 "ResolveUndoExtension",
45 "SerializedIndexEntry",
46 "SparseDirExtension",
47 "Stage",
48 "TreeDict",
49 "TreeExtension",
50 "UnmergedEntries",
51 "UnsupportedIndexFormat",
52 "UntrackedExtension",
53 "blob_from_path_and_mode",
54 "blob_from_path_and_stat",
55 "build_file_from_blob",
56 "build_index_from_tree",
57 "changes_from_tree",
58 "cleanup_mode",
59 "commit_index",
60 "commit_tree",
61 "detect_case_only_renames",
62 "get_path_element_normalizer",
63 "get_unstaged_changes",
64 "index_entry_from_stat",
65 "make_path_normalizer",
66 "pathjoin",
67 "pathsplit",
68 "read_cache_entry",
69 "read_cache_time",
70 "read_index",
71 "read_index_dict",
72 "read_index_dict_with_version",
73 "read_index_header",
74 "read_submodule_head",
75 "update_working_tree",
76 "validate_path",
77 "validate_path_element_default",
78 "validate_path_element_hfs",
79 "validate_path_element_ntfs",
80 "write_cache_entry",
81 "write_cache_time",
82 "write_index",
83 "write_index_dict",
84 "write_index_extension",
85]
87import errno
88import os
89import shutil
90import stat
91import struct
92import sys
93import types
94from collections.abc import (
95 Callable,
96 Generator,
97 Iterable,
98 Iterator,
99 Mapping,
100 Sequence,
101 Set,
102)
103from dataclasses import dataclass
104from enum import Enum
105from typing import (
106 IO,
107 TYPE_CHECKING,
108 Any,
109 BinaryIO,
110)
112if TYPE_CHECKING:
113 from .config import Config
114 from .diff_tree import TreeChange
115 from .file import _GitFile
116 from .filters import FilterBlobNormalizer
117 from .object_store import BaseObjectStore
118 from .repo import Repo
120from .file import GitFile
121from .object_store import iter_tree_contents
122from .objects import (
123 S_IFGITLINK,
124 S_ISGITLINK,
125 Blob,
126 ObjectID,
127 Tree,
128 TreeEntry,
129 hex_to_sha,
130 sha_to_hex,
131)
132from .pack import ObjectContainer, SHA1Reader, SHA1Writer
134# Type alias for recursive tree structure used in commit_tree
135TreeDict = dict[bytes, "TreeDict | tuple[int, ObjectID]"]
137# 2-bit stage (during merge)
138FLAG_STAGEMASK = 0x3000
139FLAG_STAGESHIFT = 12
140FLAG_NAMEMASK = 0x0FFF
142# assume-valid
143FLAG_VALID = 0x8000
145# extended flag (must be zero in version 2)
146FLAG_EXTENDED = 0x4000
148# used by sparse checkout
149EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
151# used by "git add -N"
152EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
154DEFAULT_VERSION = 2
156# Index extension signatures
157TREE_EXTENSION = b"TREE"
158REUC_EXTENSION = b"REUC"
159UNTR_EXTENSION = b"UNTR"
160EOIE_EXTENSION = b"EOIE"
161IEOT_EXTENSION = b"IEOT"
162SDIR_EXTENSION = b"sdir" # Sparse directory extension
165def _encode_varint(value: int) -> bytes:
166 """Encode an integer using variable-width encoding.
168 Same format as used for OFS_DELTA pack entries and index v4 path compression.
169 Uses 7 bits per byte, with the high bit indicating continuation.
171 Args:
172 value: Integer to encode
173 Returns:
174 Encoded bytes
175 """
176 if value == 0:
177 return b"\x00"
179 result = []
180 while value > 0:
181 byte = value & 0x7F # Take lower 7 bits
182 value >>= 7
183 if value > 0:
184 byte |= 0x80 # Set continuation bit
185 result.append(byte)
187 return bytes(result)
190def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
191 """Decode a variable-width encoded integer.
193 Args:
194 data: Bytes to decode from
195 offset: Starting offset in data
196 Returns:
197 tuple of (decoded_value, new_offset)
198 """
199 value = 0
200 shift = 0
201 pos = offset
203 while pos < len(data):
204 byte = data[pos]
205 pos += 1
206 value |= (byte & 0x7F) << shift
207 shift += 7
208 if not (byte & 0x80): # No continuation bit
209 break
211 return value, pos
214def _compress_path(path: bytes, previous_path: bytes) -> bytes:
215 """Compress a path relative to the previous path for index version 4.
217 Args:
218 path: Path to compress
219 previous_path: Previous path for comparison
220 Returns:
221 Compressed path data (varint prefix_len + suffix)
222 """
223 # Find the common prefix length
224 common_len = 0
225 min_len = min(len(path), len(previous_path))
227 for i in range(min_len):
228 if path[i] == previous_path[i]:
229 common_len += 1
230 else:
231 break
233 # The number of bytes to remove from the end of previous_path
234 # to get the common prefix
235 remove_len = len(previous_path) - common_len
237 # The suffix to append
238 suffix = path[common_len:]
240 # Encode: varint(remove_len) + suffix + NUL
241 return _encode_varint(remove_len) + suffix + b"\x00"
244def _decompress_path(
245 data: bytes, offset: int, previous_path: bytes
246) -> tuple[bytes, int]:
247 """Decompress a path from index version 4 compressed format.
249 Args:
250 data: Raw data containing compressed path
251 offset: Starting offset in data
252 previous_path: Previous path for decompression
253 Returns:
254 tuple of (decompressed_path, new_offset)
255 """
256 # Decode the number of bytes to remove from previous path
257 remove_len, new_offset = _decode_varint(data, offset)
259 # Find the NUL terminator for the suffix
260 suffix_start = new_offset
261 suffix_end = suffix_start
262 while suffix_end < len(data) and data[suffix_end] != 0:
263 suffix_end += 1
265 if suffix_end >= len(data):
266 raise ValueError("Unterminated path suffix in compressed entry")
268 suffix = data[suffix_start:suffix_end]
269 new_offset = suffix_end + 1 # Skip the NUL terminator
271 # Reconstruct the path
272 if remove_len > len(previous_path):
273 raise ValueError(
274 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
275 )
277 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
278 path = prefix + suffix
280 return path, new_offset
283def _decompress_path_from_stream(
284 f: BinaryIO, previous_path: bytes
285) -> tuple[bytes, int]:
286 """Decompress a path from index version 4 compressed format, reading from stream.
288 Args:
289 f: File-like object to read from
290 previous_path: Previous path for decompression
291 Returns:
292 tuple of (decompressed_path, bytes_consumed)
293 """
294 # Decode the varint for remove_len by reading byte by byte
295 remove_len = 0
296 shift = 0
297 bytes_consumed = 0
299 while True:
300 byte_data = f.read(1)
301 if not byte_data:
302 raise ValueError("Unexpected end of file while reading varint")
303 byte = byte_data[0]
304 bytes_consumed += 1
305 remove_len |= (byte & 0x7F) << shift
306 shift += 7
307 if not (byte & 0x80): # No continuation bit
308 break
310 # Read the suffix until NUL terminator
311 suffix = b""
312 while True:
313 byte_data = f.read(1)
314 if not byte_data:
315 raise ValueError("Unexpected end of file while reading path suffix")
316 byte = byte_data[0]
317 bytes_consumed += 1
318 if byte == 0: # NUL terminator
319 break
320 suffix += bytes([byte])
322 # Reconstruct the path
323 if remove_len > len(previous_path):
324 raise ValueError(
325 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
326 )
328 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
329 path = prefix + suffix
331 return path, bytes_consumed
334class Stage(Enum):
335 """Represents the stage of an index entry during merge conflicts."""
337 NORMAL = 0
338 MERGE_CONFLICT_ANCESTOR = 1
339 MERGE_CONFLICT_THIS = 2
340 MERGE_CONFLICT_OTHER = 3
343@dataclass
344class SerializedIndexEntry:
345 """Represents a serialized index entry as stored in the index file.
347 This dataclass holds the raw data for an index entry before it's
348 parsed into the more user-friendly IndexEntry format.
349 """
351 name: bytes
352 ctime: int | float | tuple[int, int]
353 mtime: int | float | tuple[int, int]
354 dev: int
355 ino: int
356 mode: int
357 uid: int
358 gid: int
359 size: int
360 sha: ObjectID
361 flags: int
362 extended_flags: int
364 def stage(self) -> Stage:
365 """Extract the stage from the flags field.
367 Returns:
368 Stage enum value indicating merge conflict state
369 """
370 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
372 def is_sparse_dir(self) -> bool:
373 """Check if this entry represents a sparse directory.
375 A sparse directory entry is a collapsed representation of an entire
376 directory tree in a sparse index. It has:
377 - Directory mode (0o040000)
378 - SKIP_WORKTREE flag set
379 - Path ending with '/'
380 - SHA pointing to a tree object
382 Returns:
383 True if entry is a sparse directory entry
384 """
385 return (
386 stat.S_ISDIR(self.mode)
387 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
388 and self.name.endswith(b"/")
389 )
392@dataclass
393class IndexExtension:
394 """Base class for index extensions."""
396 signature: bytes
397 data: bytes
399 @classmethod
400 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
401 """Create an extension from raw data.
403 Args:
404 signature: 4-byte extension signature
405 data: Extension data
406 Returns:
407 Parsed extension object
408 """
409 if signature == TREE_EXTENSION:
410 return TreeExtension.from_bytes(data)
411 elif signature == REUC_EXTENSION:
412 return ResolveUndoExtension.from_bytes(data)
413 elif signature == UNTR_EXTENSION:
414 return UntrackedExtension.from_bytes(data)
415 elif signature == SDIR_EXTENSION:
416 return SparseDirExtension.from_bytes(data)
417 else:
418 # Unknown extension - just store raw data
419 return cls(signature, data)
421 def to_bytes(self) -> bytes:
422 """Serialize extension to bytes."""
423 return self.data
426class TreeExtension(IndexExtension):
427 """Tree cache extension."""
429 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
430 """Initialize TreeExtension.
432 Args:
433 entries: List of tree cache entries (path, sha, flags)
434 """
435 self.entries = entries
436 super().__init__(TREE_EXTENSION, b"")
438 @classmethod
439 def from_bytes(cls, data: bytes) -> "TreeExtension":
440 """Parse TreeExtension from bytes.
442 Args:
443 data: Raw bytes to parse
445 Returns:
446 TreeExtension instance
447 """
448 # TODO: Implement tree cache parsing
449 return cls([])
451 def to_bytes(self) -> bytes:
452 """Serialize TreeExtension to bytes.
454 Returns:
455 Serialized extension data
456 """
457 # TODO: Implement tree cache serialization
458 return b""
461class ResolveUndoExtension(IndexExtension):
462 """Resolve undo extension for recording merge conflicts."""
464 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
465 """Initialize ResolveUndoExtension.
467 Args:
468 entries: List of (path, stages) where stages is a list of (stage, sha) tuples
469 """
470 self.entries = entries
471 super().__init__(REUC_EXTENSION, b"")
473 @classmethod
474 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
475 """Parse ResolveUndoExtension from bytes.
477 Args:
478 data: Raw bytes to parse
480 Returns:
481 ResolveUndoExtension instance
482 """
483 # TODO: Implement resolve undo parsing
484 return cls([])
486 def to_bytes(self) -> bytes:
487 """Serialize ResolveUndoExtension to bytes.
489 Returns:
490 Serialized extension data
491 """
492 # TODO: Implement resolve undo serialization
493 return b""
496class UntrackedExtension(IndexExtension):
497 """Untracked cache extension."""
499 def __init__(self, data: bytes) -> None:
500 """Initialize UntrackedExtension.
502 Args:
503 data: Raw untracked cache data
504 """
505 super().__init__(UNTR_EXTENSION, data)
507 @classmethod
508 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
509 """Parse UntrackedExtension from bytes.
511 Args:
512 data: Raw bytes to parse
514 Returns:
515 UntrackedExtension instance
516 """
517 return cls(data)
520class SparseDirExtension(IndexExtension):
521 """Sparse directory extension.
523 This extension indicates that the index contains sparse directory entries.
524 Tools that don't understand sparse index should avoid interacting with
525 the index when this extension is present.
527 The extension data is empty - its presence is the signal.
528 """
530 def __init__(self) -> None:
531 """Initialize SparseDirExtension."""
532 super().__init__(SDIR_EXTENSION, b"")
534 @classmethod
535 def from_bytes(cls, data: bytes) -> "SparseDirExtension":
536 """Parse SparseDirExtension from bytes.
538 Args:
539 data: Raw bytes to parse (should be empty)
541 Returns:
542 SparseDirExtension instance
543 """
544 return cls()
546 def to_bytes(self) -> bytes:
547 """Serialize SparseDirExtension to bytes.
549 Returns:
550 Empty bytes (extension presence is the signal)
551 """
552 return b""
555@dataclass
556class IndexEntry:
557 """Represents an entry in the Git index.
559 This is a higher-level representation of an index entry that includes
560 parsed data and convenience methods.
561 """
563 ctime: int | float | tuple[int, int]
564 mtime: int | float | tuple[int, int]
565 dev: int
566 ino: int
567 mode: int
568 uid: int
569 gid: int
570 size: int
571 sha: ObjectID
572 flags: int = 0
573 extended_flags: int = 0
575 @classmethod
576 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
577 """Create an IndexEntry from a SerializedIndexEntry.
579 Args:
580 serialized: SerializedIndexEntry to convert
582 Returns:
583 New IndexEntry instance
584 """
585 return cls(
586 ctime=serialized.ctime,
587 mtime=serialized.mtime,
588 dev=serialized.dev,
589 ino=serialized.ino,
590 mode=serialized.mode,
591 uid=serialized.uid,
592 gid=serialized.gid,
593 size=serialized.size,
594 sha=serialized.sha,
595 flags=serialized.flags,
596 extended_flags=serialized.extended_flags,
597 )
599 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
600 """Serialize this entry with a given name and stage.
602 Args:
603 name: Path name for the entry
604 stage: Merge conflict stage
606 Returns:
607 SerializedIndexEntry ready for writing to disk
608 """
609 # Clear out any existing stage bits, then set them from the Stage.
610 new_flags = self.flags & ~FLAG_STAGEMASK
611 new_flags |= stage.value << FLAG_STAGESHIFT
612 return SerializedIndexEntry(
613 name=name,
614 ctime=self.ctime,
615 mtime=self.mtime,
616 dev=self.dev,
617 ino=self.ino,
618 mode=self.mode,
619 uid=self.uid,
620 gid=self.gid,
621 size=self.size,
622 sha=self.sha,
623 flags=new_flags,
624 extended_flags=self.extended_flags,
625 )
627 def stage(self) -> Stage:
628 """Get the merge conflict stage of this entry.
630 Returns:
631 Stage enum value
632 """
633 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
635 @property
636 def skip_worktree(self) -> bool:
637 """Return True if the skip-worktree bit is set in extended_flags."""
638 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
640 def set_skip_worktree(self, skip: bool = True) -> None:
641 """Helper method to set or clear the skip-worktree bit in extended_flags.
643 Also sets FLAG_EXTENDED in self.flags if needed.
644 """
645 if skip:
646 # Turn on the skip-worktree bit
647 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
648 # Also ensure the main 'extended' bit is set in flags
649 self.flags |= FLAG_EXTENDED
650 else:
651 # Turn off the skip-worktree bit
652 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
653 # Optionally unset the main extended bit if no extended flags remain
654 if self.extended_flags == 0:
655 self.flags &= ~FLAG_EXTENDED
657 def is_sparse_dir(self, name: bytes) -> bool:
658 """Check if this entry represents a sparse directory.
660 A sparse directory entry is a collapsed representation of an entire
661 directory tree in a sparse index. It has:
662 - Directory mode (0o040000)
663 - SKIP_WORKTREE flag set
664 - Path ending with '/'
665 - SHA pointing to a tree object
667 Args:
668 name: The path name for this entry (IndexEntry doesn't store name)
670 Returns:
671 True if entry is a sparse directory entry
672 """
673 return (
674 stat.S_ISDIR(self.mode)
675 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
676 and name.endswith(b"/")
677 )
680class ConflictedIndexEntry:
681 """Index entry that represents a conflict."""
683 ancestor: IndexEntry | None
684 this: IndexEntry | None
685 other: IndexEntry | None
687 def __init__(
688 self,
689 ancestor: IndexEntry | None = None,
690 this: IndexEntry | None = None,
691 other: IndexEntry | None = None,
692 ) -> None:
693 """Initialize ConflictedIndexEntry.
695 Args:
696 ancestor: The common ancestor entry
697 this: The current branch entry
698 other: The other branch entry
699 """
700 self.ancestor = ancestor
701 self.this = this
702 self.other = other
705class UnmergedEntries(Exception):
706 """Unmerged entries exist in the index."""
709def pathsplit(path: bytes) -> tuple[bytes, bytes]:
710 """Split a /-delimited path into a directory part and a basename.
712 Args:
713 path: The path to split.
715 Returns:
716 Tuple with directory name and basename
717 """
718 try:
719 (dirname, basename) = path.rsplit(b"/", 1)
720 except ValueError:
721 return (b"", path)
722 else:
723 return (dirname, basename)
726def pathjoin(*args: bytes) -> bytes:
727 """Join a /-delimited path."""
728 return b"/".join([p for p in args if p])
731def read_cache_time(f: BinaryIO) -> tuple[int, int]:
732 """Read a cache time.
734 Args:
735 f: File-like object to read from
736 Returns:
737 Tuple with seconds and nanoseconds
738 """
739 return struct.unpack(">LL", f.read(8))
742def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None:
743 """Write a cache time.
745 Args:
746 f: File-like object to write to
747 t: Time to write (as int, float or tuple with secs and nsecs)
748 """
749 if isinstance(t, int):
750 t = (t, 0)
751 elif isinstance(t, float):
752 (secs, nsecs) = divmod(t, 1.0)
753 t = (int(secs), int(nsecs * 1000000000))
754 elif not isinstance(t, tuple):
755 raise TypeError(t)
756 f.write(struct.pack(">LL", *t))
759def read_cache_entry(
760 f: BinaryIO, version: int, previous_path: bytes = b""
761) -> SerializedIndexEntry:
762 """Read an entry from a cache file.
764 Args:
765 f: File-like object to read from
766 version: Index version
767 previous_path: Previous entry's path (for version 4 compression)
768 """
769 beginoffset = f.tell()
770 ctime = read_cache_time(f)
771 mtime = read_cache_time(f)
772 (
773 dev,
774 ino,
775 mode,
776 uid,
777 gid,
778 size,
779 sha,
780 flags,
781 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
782 if flags & FLAG_EXTENDED:
783 if version < 3:
784 raise AssertionError("extended flag set in index with version < 3")
785 (extended_flags,) = struct.unpack(">H", f.read(2))
786 else:
787 extended_flags = 0
789 if version >= 4:
790 # Version 4: paths are always compressed (name_len should be 0)
791 name, _consumed = _decompress_path_from_stream(f, previous_path)
792 else:
793 # Versions < 4: regular name reading
794 name = f.read(flags & FLAG_NAMEMASK)
796 # Padding:
797 if version < 4:
798 real_size = (f.tell() - beginoffset + 8) & ~7
799 f.read((beginoffset + real_size) - f.tell())
801 return SerializedIndexEntry(
802 name,
803 ctime,
804 mtime,
805 dev,
806 ino,
807 mode,
808 uid,
809 gid,
810 size,
811 sha_to_hex(sha),
812 flags & ~FLAG_NAMEMASK,
813 extended_flags,
814 )
817def write_cache_entry(
818 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
819) -> None:
820 """Write an index entry to a file.
822 Args:
823 f: File object
824 entry: IndexEntry to write
825 version: Index format version
826 previous_path: Previous entry's path (for version 4 compression)
827 """
828 beginoffset = f.tell()
829 write_cache_time(f, entry.ctime)
830 write_cache_time(f, entry.mtime)
832 if version >= 4:
833 # Version 4: use compression but set name_len to actual filename length
834 # This matches how C Git implements index v4 flags
835 compressed_path = _compress_path(entry.name, previous_path)
836 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
837 else:
838 # Versions < 4: include actual name length
839 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
841 if entry.extended_flags:
842 flags |= FLAG_EXTENDED
843 if flags & FLAG_EXTENDED and version is not None and version < 3:
844 raise AssertionError("unable to use extended flags in version < 3")
846 f.write(
847 struct.pack(
848 b">LLLLLL20sH",
849 entry.dev & 0xFFFFFFFF,
850 entry.ino & 0xFFFFFFFF,
851 entry.mode,
852 entry.uid,
853 entry.gid,
854 entry.size,
855 hex_to_sha(entry.sha),
856 flags,
857 )
858 )
859 if flags & FLAG_EXTENDED:
860 f.write(struct.pack(b">H", entry.extended_flags))
862 if version >= 4:
863 # Version 4: always write compressed path
864 f.write(compressed_path)
865 else:
866 # Versions < 4: write regular path and padding
867 f.write(entry.name)
868 real_size = (f.tell() - beginoffset + 8) & ~7
869 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
872class UnsupportedIndexFormat(Exception):
873 """An unsupported index format was encountered."""
875 def __init__(self, version: int) -> None:
876 """Initialize UnsupportedIndexFormat exception.
878 Args:
879 version: The unsupported index format version
880 """
881 self.index_format_version = version
884def read_index_header(f: BinaryIO) -> tuple[int, int]:
885 """Read an index header from a file.
887 Returns:
888 tuple of (version, num_entries)
889 """
890 header = f.read(4)
891 if header != b"DIRC":
892 raise AssertionError(f"Invalid index file header: {header!r}")
893 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
894 if version not in (1, 2, 3, 4):
895 raise UnsupportedIndexFormat(version)
896 return version, num_entries
899def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None:
900 """Write an index extension.
902 Args:
903 f: File-like object to write to
904 extension: Extension to write
905 """
906 data = extension.to_bytes()
907 f.write(extension.signature)
908 f.write(struct.pack(">I", len(data)))
909 f.write(data)
912def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
913 """Read an index file, yielding the individual entries."""
914 version, num_entries = read_index_header(f)
915 previous_path = b""
916 for i in range(num_entries):
917 entry = read_cache_entry(f, version, previous_path)
918 previous_path = entry.name
919 yield entry
922def read_index_dict_with_version(
923 f: BinaryIO,
924) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]:
925 """Read an index file and return it as a dictionary along with the version.
927 Returns:
928 tuple of (entries_dict, version, extensions)
929 """
930 version, num_entries = read_index_header(f)
932 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
933 previous_path = b""
934 for i in range(num_entries):
935 entry = read_cache_entry(f, version, previous_path)
936 previous_path = entry.name
937 stage = entry.stage()
938 if stage == Stage.NORMAL:
939 ret[entry.name] = IndexEntry.from_serialized(entry)
940 else:
941 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
942 if isinstance(existing, IndexEntry):
943 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
944 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
945 existing.ancestor = IndexEntry.from_serialized(entry)
946 elif stage == Stage.MERGE_CONFLICT_THIS:
947 existing.this = IndexEntry.from_serialized(entry)
948 elif stage == Stage.MERGE_CONFLICT_OTHER:
949 existing.other = IndexEntry.from_serialized(entry)
951 # Read extensions
952 extensions = []
953 while True:
954 # Check if we're at the end (20 bytes before EOF for SHA checksum)
955 current_pos = f.tell()
956 f.seek(0, 2) # EOF
957 eof_pos = f.tell()
958 f.seek(current_pos)
960 if current_pos >= eof_pos - 20:
961 break
963 # Try to read extension signature
964 signature = f.read(4)
965 if len(signature) < 4:
966 break
968 # Check if it's a valid extension signature (4 uppercase letters)
969 if not all(65 <= b <= 90 for b in signature):
970 # Not an extension, seek back
971 f.seek(-4, 1)
972 break
974 # Read extension size
975 size_data = f.read(4)
976 if len(size_data) < 4:
977 break
978 size = struct.unpack(">I", size_data)[0]
980 # Read extension data
981 data = f.read(size)
982 if len(data) < size:
983 break
985 extension = IndexExtension.from_raw(signature, data)
986 extensions.append(extension)
988 return ret, version, extensions
991def read_index_dict(
992 f: BinaryIO,
993) -> dict[bytes, IndexEntry | ConflictedIndexEntry]:
994 """Read an index file and return it as a dictionary.
996 Dict Key is tuple of path and stage number, as
997 path alone is not unique
998 Args:
999 f: File object to read fromls.
1000 """
1001 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
1002 for entry in read_index(f):
1003 stage = entry.stage()
1004 if stage == Stage.NORMAL:
1005 ret[entry.name] = IndexEntry.from_serialized(entry)
1006 else:
1007 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
1008 if isinstance(existing, IndexEntry):
1009 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
1010 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
1011 existing.ancestor = IndexEntry.from_serialized(entry)
1012 elif stage == Stage.MERGE_CONFLICT_THIS:
1013 existing.this = IndexEntry.from_serialized(entry)
1014 elif stage == Stage.MERGE_CONFLICT_OTHER:
1015 existing.other = IndexEntry.from_serialized(entry)
1016 return ret
1019def write_index(
1020 f: IO[bytes],
1021 entries: Sequence[SerializedIndexEntry],
1022 version: int | None = None,
1023 extensions: Sequence[IndexExtension] | None = None,
1024) -> None:
1025 """Write an index file.
1027 Args:
1028 f: File-like object to write to
1029 version: Version number to write
1030 entries: Iterable over the entries to write
1031 extensions: Optional list of extensions to write
1032 """
1033 if version is None:
1034 version = DEFAULT_VERSION
1035 # STEP 1: check if any extended_flags are set
1036 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
1037 if uses_extended_flags and version < 3:
1038 # Force or bump the version to 3
1039 version = 3
1040 # The rest is unchanged, but you might insert a final check:
1041 if version < 3:
1042 # Double-check no extended flags appear
1043 for e in entries:
1044 if e.extended_flags != 0:
1045 raise AssertionError("Attempt to use extended flags in index < v3")
1046 # Proceed with the existing code to write the header and entries.
1047 f.write(b"DIRC")
1048 f.write(struct.pack(b">LL", version, len(entries)))
1049 previous_path = b""
1050 for entry in entries:
1051 write_cache_entry(f, entry, version=version, previous_path=previous_path)
1052 previous_path = entry.name
1054 # Write extensions
1055 if extensions:
1056 for extension in extensions:
1057 write_index_extension(f, extension)
1060def write_index_dict(
1061 f: IO[bytes],
1062 entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry],
1063 version: int | None = None,
1064 extensions: Sequence[IndexExtension] | None = None,
1065) -> None:
1066 """Write an index file based on the contents of a dictionary.
1068 being careful to sort by path and then by stage.
1069 """
1070 entries_list = []
1071 for key in sorted(entries):
1072 value = entries[key]
1073 if isinstance(value, ConflictedIndexEntry):
1074 if value.ancestor is not None:
1075 entries_list.append(
1076 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
1077 )
1078 if value.this is not None:
1079 entries_list.append(
1080 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
1081 )
1082 if value.other is not None:
1083 entries_list.append(
1084 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
1085 )
1086 else:
1087 entries_list.append(value.serialize(key, Stage.NORMAL))
1089 write_index(f, entries_list, version=version, extensions=extensions)
1092def cleanup_mode(mode: int) -> int:
1093 """Cleanup a mode value.
1095 This will return a mode that can be stored in a tree object.
1097 Args:
1098 mode: Mode to clean up.
1100 Returns:
1101 mode
1102 """
1103 if stat.S_ISLNK(mode):
1104 return stat.S_IFLNK
1105 elif stat.S_ISDIR(mode):
1106 return stat.S_IFDIR
1107 elif S_ISGITLINK(mode):
1108 return S_IFGITLINK
1109 ret = stat.S_IFREG | 0o644
1110 if mode & 0o100:
1111 ret |= 0o111
1112 return ret
1115class Index:
1116 """A Git Index file."""
1118 _byname: dict[bytes, IndexEntry | ConflictedIndexEntry]
1120 def __init__(
1121 self,
1122 filename: bytes | str | os.PathLike[str],
1123 read: bool = True,
1124 skip_hash: bool = False,
1125 version: int | None = None,
1126 *,
1127 file_mode: int | None = None,
1128 path_normalizer: Callable[[bytes], bytes] | None = None,
1129 ) -> None:
1130 """Create an index object associated with the given filename.
1132 Args:
1133 filename: Path to the index file
1134 read: Whether to initialize the index from the given file, should it exist.
1135 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
1136 version: Index format version to use (None = auto-detect from file or use default)
1137 file_mode: Optional file permission mask for shared repository
1138 path_normalizer: Optional function mapping a filesystem path to a
1139 canonical form (e.g. case-folded, NFC-normalized). When provided,
1140 lookups (``index[path]``, ``path in index``, ``del index[path]``)
1141 transparently match paths that normalize to the same form as an
1142 existing entry.
1143 """
1144 self._filename = os.fspath(filename)
1145 # TODO(jelmer): Store the version returned by read_index
1146 self._version = version
1147 self._skip_hash = skip_hash
1148 self._file_mode = file_mode
1149 self._extensions: list[IndexExtension] = []
1150 self._path_normalizer = path_normalizer
1151 self._normalized: dict[bytes, bytes] | None = (
1152 {} if path_normalizer is not None else None
1153 )
1154 self.clear()
1155 if read:
1156 self.read()
1158 def canonical_path(self, name: bytes) -> bytes:
1159 """Resolve ``name`` to the canonical key stored in the index.
1161 If an entry already exists under ``name`` (or no normalizer is
1162 configured), ``name`` is returned unchanged. Otherwise the
1163 normalizer is applied and the key of any entry with the same
1164 normalized form is returned. Falls back to ``name`` if none.
1166 Normally callers do not need this because ``index[name]``,
1167 ``name in index``, and ``del index[name]`` already apply
1168 normalization transparently. Use this when the path is also
1169 being used outside the index (for example to look up the same
1170 entry in a commit tree), so that both sides agree on the key.
1171 """
1172 if self._normalized is None or name in self._byname:
1173 return name
1174 assert self._path_normalizer is not None
1175 return self._normalized.get(self._path_normalizer(name), name)
1177 @property
1178 def path(self) -> bytes | str:
1179 """Get the path to the index file.
1181 Returns:
1182 Path to the index file
1183 """
1184 return self._filename
1186 def __repr__(self) -> str:
1187 """Return string representation of Index."""
1188 return f"{self.__class__.__name__}({self._filename!r})"
1190 def write(self) -> None:
1191 """Write current contents of index to disk."""
1192 mask = self._file_mode if self._file_mode is not None else 0o644
1193 f = GitFile(self._filename, "wb", mask=mask)
1194 try:
1195 # Filter out extensions with no meaningful data
1196 meaningful_extensions = []
1197 for ext in self._extensions:
1198 # Skip extensions that have empty data
1199 ext_data = ext.to_bytes()
1200 if ext_data:
1201 meaningful_extensions.append(ext)
1203 if self._skip_hash:
1204 # When skipHash is enabled, write the index without computing SHA1
1205 write_index_dict(
1206 f,
1207 self._byname,
1208 version=self._version,
1209 extensions=meaningful_extensions,
1210 )
1211 # Write 20 zero bytes instead of SHA1
1212 f.write(b"\x00" * 20)
1213 f.close()
1214 else:
1215 sha1_writer = SHA1Writer(f)
1216 write_index_dict(
1217 sha1_writer,
1218 self._byname,
1219 version=self._version,
1220 extensions=meaningful_extensions,
1221 )
1222 sha1_writer.close()
1223 except:
1224 f.close()
1225 raise
1227 def read(self) -> None:
1228 """Read current contents of index from disk."""
1229 if not os.path.exists(self._filename):
1230 return
1231 f = GitFile(self._filename, "rb")
1232 try:
1233 sha1_reader = SHA1Reader(f)
1234 entries, version, extensions = read_index_dict_with_version(sha1_reader)
1235 self._version = version
1236 self._extensions = extensions
1237 self.update(entries)
1238 # Extensions have already been read by read_index_dict_with_version
1239 sha1_reader.check_sha(allow_empty=True)
1240 finally:
1241 f.close()
1243 def __len__(self) -> int:
1244 """Number of entries in this index file."""
1245 return len(self._byname)
1247 def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry:
1248 """Retrieve entry by relative path and stage.
1250 Returns: Either a IndexEntry or a ConflictedIndexEntry
1251 Raises KeyError: if the entry does not exist
1252 """
1253 return self._byname[self.canonical_path(key)]
1255 def __iter__(self) -> Iterator[bytes]:
1256 """Iterate over the paths and stages in this index."""
1257 return iter(self._byname)
1259 def __contains__(self, key: bytes) -> bool:
1260 """Check if a path exists in the index."""
1261 return self.canonical_path(key) in self._byname
1263 def get_sha1(self, path: bytes) -> ObjectID:
1264 """Return the (git object) SHA1 for the object at a path."""
1265 value = self[path]
1266 if isinstance(value, ConflictedIndexEntry):
1267 raise UnmergedEntries
1268 return value.sha
1270 def get_mode(self, path: bytes) -> int:
1271 """Return the POSIX file mode for the object at a path."""
1272 value = self[path]
1273 if isinstance(value, ConflictedIndexEntry):
1274 raise UnmergedEntries
1275 return value.mode
1277 def iterobjects(self) -> Iterable[tuple[bytes, ObjectID, int]]:
1278 """Iterate over path, sha, mode tuples for use with commit_tree."""
1279 for path in self:
1280 entry = self[path]
1281 if isinstance(entry, ConflictedIndexEntry):
1282 raise UnmergedEntries
1283 yield path, entry.sha, cleanup_mode(entry.mode)
1285 def has_conflicts(self) -> bool:
1286 """Check if the index contains any conflicted entries.
1288 Returns:
1289 True if any entries are conflicted, False otherwise
1290 """
1291 for value in self._byname.values():
1292 if isinstance(value, ConflictedIndexEntry):
1293 return True
1294 return False
1296 def clear(self) -> None:
1297 """Remove all contents from this index."""
1298 self._byname = {}
1299 if self._normalized is not None:
1300 self._normalized = {}
1302 def __setitem__(
1303 self, name: bytes, value: IndexEntry | ConflictedIndexEntry
1304 ) -> None:
1305 """Set an entry in the index."""
1306 assert isinstance(name, bytes)
1307 name = self.canonical_path(name)
1308 is_new = name not in self._byname
1309 self._byname[name] = value
1310 if is_new and self._normalized is not None:
1311 assert self._path_normalizer is not None
1312 self._normalized.setdefault(self._path_normalizer(name), name)
1314 def __delitem__(self, name: bytes) -> None:
1315 """Delete an entry from the index."""
1316 name = self.canonical_path(name)
1317 del self._byname[name]
1318 if self._normalized is not None:
1319 assert self._path_normalizer is not None
1320 normalized_key = self._path_normalizer(name)
1321 if self._normalized.get(normalized_key) == name:
1322 del self._normalized[normalized_key]
1324 def iteritems(
1325 self,
1326 ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
1327 """Iterate over (path, entry) pairs in the index.
1329 Returns:
1330 Iterator of (path, entry) tuples
1331 """
1332 return iter(self._byname.items())
1334 def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
1335 """Get an iterator over (path, entry) pairs.
1337 Returns:
1338 Iterator of (path, entry) tuples
1339 """
1340 return iter(self._byname.items())
1342 def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None:
1343 """Update the index with multiple entries.
1345 Args:
1346 entries: Dictionary mapping paths to index entries
1347 """
1348 for key, value in entries.items():
1349 self[key] = value
1351 def paths(self) -> Generator[bytes, None, None]:
1352 """Generate all paths in the index.
1354 Yields:
1355 Path names as bytes
1356 """
1357 yield from self._byname.keys()
1359 def changes_from_tree(
1360 self,
1361 object_store: ObjectContainer,
1362 tree: ObjectID,
1363 want_unchanged: bool = False,
1364 ) -> Generator[
1365 tuple[
1366 tuple[bytes | None, bytes | None],
1367 tuple[int | None, int | None],
1368 tuple[bytes | None, bytes | None],
1369 ],
1370 None,
1371 None,
1372 ]:
1373 """Find the differences between the contents of this index and a tree.
1375 Args:
1376 object_store: Object store to use for retrieving tree contents
1377 tree: SHA1 of the root tree
1378 want_unchanged: Whether unchanged files should be reported
1379 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1380 newmode), (oldsha, newsha)
1381 """
1383 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1384 entry = self[path]
1385 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1386 return entry.sha, cleanup_mode(entry.mode)
1387 else:
1388 # Handle ConflictedIndexEntry case
1389 return b"", 0
1391 yield from changes_from_tree(
1392 self.paths(),
1393 lookup_entry,
1394 object_store,
1395 tree,
1396 want_unchanged=want_unchanged,
1397 )
1399 def commit(self, object_store: ObjectContainer) -> ObjectID:
1400 """Create a new tree from an index.
1402 Args:
1403 object_store: Object store to save the tree in
1404 Returns:
1405 Root tree SHA
1406 """
1407 return commit_tree(object_store, self.iterobjects())
1409 def is_sparse(self) -> bool:
1410 """Check if this index contains sparse directory entries.
1412 Returns:
1413 True if any sparse directory extension is present
1414 """
1415 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions)
1417 def ensure_full_index(self, object_store: "BaseObjectStore") -> None:
1418 """Expand all sparse directory entries into full file entries.
1420 This converts a sparse index into a full index by recursively
1421 expanding any sparse directory entries into their constituent files.
1423 Args:
1424 object_store: Object store to read tree objects from
1426 Raises:
1427 KeyError: If a tree object referenced by a sparse dir entry doesn't exist
1428 """
1429 if not self.is_sparse():
1430 return
1432 # Find all sparse directory entries
1433 sparse_dirs = []
1434 for path, entry in list(self._byname.items()):
1435 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path):
1436 sparse_dirs.append((path, entry))
1438 # Expand each sparse directory
1439 for path, entry in sparse_dirs:
1440 # Remove the sparse directory entry
1441 del self[path]
1443 # Get the tree object
1444 tree = object_store[entry.sha]
1445 if not isinstance(tree, Tree):
1446 raise ValueError(f"Sparse directory {path!r} points to non-tree object")
1448 # Recursively add all entries from the tree
1449 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry)
1451 # Remove the sparse directory extension
1452 self._extensions = [
1453 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension)
1454 ]
1456 def _expand_tree(
1457 self,
1458 prefix: bytes,
1459 tree: Tree,
1460 object_store: "BaseObjectStore",
1461 template_entry: IndexEntry,
1462 ) -> None:
1463 """Recursively expand a tree into index entries.
1465 Args:
1466 prefix: Path prefix for entries (without trailing slash)
1467 tree: Tree object to expand
1468 object_store: Object store to read nested trees from
1469 template_entry: Template entry to copy metadata from
1470 """
1471 for name, mode, sha in tree.items():
1472 if prefix:
1473 full_path = prefix + b"/" + name
1474 else:
1475 full_path = name
1477 if stat.S_ISDIR(mode):
1478 # Recursively expand subdirectories
1479 subtree = object_store[sha]
1480 if not isinstance(subtree, Tree):
1481 raise ValueError(
1482 f"Directory entry {full_path!r} points to non-tree object"
1483 )
1484 self._expand_tree(full_path, subtree, object_store, template_entry)
1485 else:
1486 # Create an index entry for this file
1487 # Use the template entry for metadata but with the file's sha and mode
1488 new_entry = IndexEntry(
1489 ctime=template_entry.ctime,
1490 mtime=template_entry.mtime,
1491 dev=template_entry.dev,
1492 ino=template_entry.ino,
1493 mode=mode,
1494 uid=template_entry.uid,
1495 gid=template_entry.gid,
1496 size=0, # Size is unknown from tree
1497 sha=sha,
1498 flags=0,
1499 extended_flags=0, # Don't copy skip-worktree flag
1500 )
1501 self[full_path] = new_entry
1503 def convert_to_sparse(
1504 self,
1505 object_store: "BaseObjectStore",
1506 tree_sha: ObjectID,
1507 sparse_dirs: Set[bytes],
1508 ) -> None:
1509 """Convert full index entries to sparse directory entries.
1511 This collapses directories that are entirely outside the sparse
1512 checkout cone into single sparse directory entries.
1514 Args:
1515 object_store: Object store to read tree objects
1516 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on
1517 sparse_dirs: Set of directory paths (with trailing /) to collapse
1519 Raises:
1520 KeyError: If tree_sha or a subdirectory doesn't exist
1521 """
1522 if not sparse_dirs:
1523 return
1525 # Get the base tree
1526 tree = object_store[tree_sha]
1527 if not isinstance(tree, Tree):
1528 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object")
1530 # For each sparse directory, find its tree SHA and create sparse entry
1531 for dir_path in sparse_dirs:
1532 dir_path_stripped = dir_path.rstrip(b"/")
1534 # Find the tree SHA for this directory
1535 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store)
1536 if subtree_sha is None:
1537 # Directory doesn't exist in tree, skip it
1538 continue
1540 # Remove all entries under this directory
1541 entries_to_remove = [
1542 path
1543 for path in self._byname
1544 if path.startswith(dir_path) or path == dir_path_stripped
1545 ]
1546 for path in entries_to_remove:
1547 del self[path]
1549 # Create a sparse directory entry
1550 # Use minimal metadata since it's not a real file
1551 from dulwich.objects import ObjectID
1553 sparse_entry = IndexEntry(
1554 ctime=0,
1555 mtime=0,
1556 dev=0,
1557 ino=0,
1558 mode=stat.S_IFDIR,
1559 uid=0,
1560 gid=0,
1561 size=0,
1562 sha=ObjectID(subtree_sha),
1563 flags=0,
1564 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
1565 )
1566 self[dir_path] = sparse_entry
1568 # Add sparse directory extension if not present
1569 if not self.is_sparse():
1570 self._extensions.append(SparseDirExtension())
1572 def _find_subtree_sha(
1573 self,
1574 tree: Tree,
1575 path: bytes,
1576 object_store: "BaseObjectStore",
1577 ) -> bytes | None:
1578 """Find the SHA of a subtree at a given path.
1580 Args:
1581 tree: Root tree object to search in
1582 path: Path to the subdirectory (no trailing slash)
1583 object_store: Object store to read nested trees from
1585 Returns:
1586 SHA of the subtree, or None if path doesn't exist
1587 """
1588 if not path:
1589 return tree.id
1591 parts = path.split(b"/")
1592 current_tree = tree
1594 for part in parts:
1595 # Look for this part in the current tree
1596 try:
1597 mode, sha = current_tree[part]
1598 except KeyError:
1599 return None
1601 if not stat.S_ISDIR(mode):
1602 # Path component is a file, not a directory
1603 return None
1605 # Load the next tree
1606 obj = object_store[sha]
1607 if not isinstance(obj, Tree):
1608 return None
1609 current_tree = obj
1611 return current_tree.id
1614def commit_tree(
1615 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, ObjectID, int]]
1616) -> ObjectID:
1617 """Commit a new tree.
1619 Args:
1620 object_store: Object store to add trees to
1621 blobs: Iterable over blob path, sha, mode entries
1622 Returns:
1623 SHA1 of the created tree.
1624 """
1625 trees: dict[bytes, TreeDict] = {b"": {}}
1627 def add_tree(path: bytes) -> TreeDict:
1628 if path in trees:
1629 return trees[path]
1630 dirname, basename = pathsplit(path)
1631 t = add_tree(dirname)
1632 assert isinstance(basename, bytes)
1633 newtree: TreeDict = {}
1634 t[basename] = newtree
1635 trees[path] = newtree
1636 return newtree
1638 for path, sha, mode in blobs:
1639 tree_path, basename = pathsplit(path)
1640 tree = add_tree(tree_path)
1641 tree[basename] = (mode, sha)
1643 def build_tree(path: bytes) -> ObjectID:
1644 tree = Tree()
1645 for basename, entry in trees[path].items():
1646 if isinstance(entry, dict):
1647 mode = stat.S_IFDIR
1648 sha = build_tree(pathjoin(path, basename))
1649 else:
1650 (mode, sha) = entry
1651 tree.add(basename, mode, sha)
1652 object_store.add_object(tree)
1653 return tree.id
1655 return build_tree(b"")
1658def commit_index(object_store: ObjectContainer, index: Index) -> ObjectID:
1659 """Create a new tree from an index.
1661 Args:
1662 object_store: Object store to save the tree in
1663 index: Index file
1664 Note: This function is deprecated, use index.commit() instead.
1665 Returns: Root tree sha.
1666 """
1667 return commit_tree(object_store, index.iterobjects())
1670def changes_from_tree(
1671 names: Iterable[bytes],
1672 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1673 object_store: ObjectContainer,
1674 tree: ObjectID | None,
1675 want_unchanged: bool = False,
1676) -> Iterable[
1677 tuple[
1678 tuple[bytes | None, bytes | None],
1679 tuple[int | None, int | None],
1680 tuple[bytes | None, bytes | None],
1681 ]
1682]:
1683 """Find the differences between the contents of a tree and a working copy.
1685 Args:
1686 names: Iterable of names in the working copy
1687 lookup_entry: Function to lookup an entry in the working copy
1688 object_store: Object store to use for retrieving tree contents
1689 tree: SHA1 of the root tree, or None for an empty tree
1690 want_unchanged: Whether unchanged files should be reported
1691 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1692 (oldsha, newsha)
1693 """
1694 # TODO(jelmer): Support a include_trees option
1695 other_names = set(names)
1697 if tree is not None:
1698 for name, mode, sha in iter_tree_contents(object_store, tree):
1699 assert name is not None and mode is not None and sha is not None
1700 try:
1701 (other_sha, other_mode) = lookup_entry(name)
1702 except KeyError:
1703 # Was removed
1704 yield ((name, None), (mode, None), (sha, None))
1705 else:
1706 other_names.remove(name)
1707 if want_unchanged or other_sha != sha or other_mode != mode:
1708 yield ((name, name), (mode, other_mode), (sha, other_sha))
1710 # Mention added files
1711 for name in other_names:
1712 try:
1713 (other_sha, other_mode) = lookup_entry(name)
1714 except KeyError:
1715 pass
1716 else:
1717 yield ((None, name), (None, other_mode), (None, other_sha))
1720def index_entry_from_stat(
1721 stat_val: os.stat_result,
1722 hex_sha: bytes,
1723 mode: int | None = None,
1724) -> IndexEntry:
1725 """Create a new index entry from a stat value.
1727 Args:
1728 stat_val: POSIX stat_result instance
1729 hex_sha: Hex sha of the object
1730 mode: Optional file mode, will be derived from stat if not provided
1731 """
1732 if mode is None:
1733 mode = cleanup_mode(stat_val.st_mode)
1735 from dulwich.objects import ObjectID
1737 # Use nanosecond precision when available to avoid precision loss
1738 # through float representation
1739 ctime: int | float | tuple[int, int]
1740 mtime: int | float | tuple[int, int]
1741 st_ctime_ns = getattr(stat_val, "st_ctime_ns", None)
1742 if st_ctime_ns is not None:
1743 ctime = (
1744 st_ctime_ns // 1_000_000_000,
1745 st_ctime_ns % 1_000_000_000,
1746 )
1747 else:
1748 ctime = stat_val.st_ctime
1750 st_mtime_ns = getattr(stat_val, "st_mtime_ns", None)
1751 if st_mtime_ns is not None:
1752 mtime = (
1753 st_mtime_ns // 1_000_000_000,
1754 st_mtime_ns % 1_000_000_000,
1755 )
1756 else:
1757 mtime = stat_val.st_mtime
1759 return IndexEntry(
1760 ctime=ctime,
1761 mtime=mtime,
1762 dev=stat_val.st_dev,
1763 ino=stat_val.st_ino,
1764 mode=mode,
1765 uid=stat_val.st_uid,
1766 gid=stat_val.st_gid,
1767 size=stat_val.st_size,
1768 sha=ObjectID(hex_sha),
1769 flags=0,
1770 extended_flags=0,
1771 )
1774if sys.platform == "win32":
1775 # On Windows, creating symlinks either requires administrator privileges
1776 # or developer mode. Raise a more helpful error when we're unable to
1777 # create symlinks
1779 # https://github.com/jelmer/dulwich/issues/1005
1781 class WindowsSymlinkPermissionError(PermissionError):
1782 """Windows-specific error for symlink creation failures.
1784 This error is raised when symlink creation fails on Windows,
1785 typically due to lack of developer mode or administrator privileges.
1786 """
1788 def __init__(self, errno: int, msg: str, filename: str | None) -> None:
1789 """Initialize WindowsSymlinkPermissionError."""
1790 super().__init__(
1791 errno,
1792 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1793 filename,
1794 )
1796 def symlink(
1797 src: str | bytes,
1798 dst: str | bytes,
1799 target_is_directory: bool = False,
1800 *,
1801 dir_fd: int | None = None,
1802 ) -> None:
1803 """Create a symbolic link on Windows with better error handling.
1805 Args:
1806 src: Source path for the symlink
1807 dst: Destination path where symlink will be created
1808 target_is_directory: Whether the target is a directory
1809 dir_fd: Optional directory file descriptor
1811 Raises:
1812 WindowsSymlinkPermissionError: If symlink creation fails due to permissions
1813 """
1814 try:
1815 return os.symlink(
1816 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1817 )
1818 except PermissionError as e:
1819 raise WindowsSymlinkPermissionError(
1820 e.errno or 0, e.strerror or "", e.filename
1821 ) from e
1822else:
1823 symlink = os.symlink
1826def build_file_from_blob(
1827 blob: Blob,
1828 mode: int,
1829 target_path: bytes,
1830 *,
1831 honor_filemode: bool = True,
1832 tree_encoding: str = "utf-8",
1833 symlink_fn: Callable[
1834 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
1835 ]
1836 | None = None,
1837) -> os.stat_result:
1838 """Build a file or symlink on disk based on a Git object.
1840 Args:
1841 blob: The git object
1842 mode: File mode
1843 target_path: Path to write to
1844 honor_filemode: An optional flag to honor core.filemode setting in
1845 config file, default is core.filemode=True, change executable bit
1846 tree_encoding: Encoding to use for tree contents
1847 symlink_fn: Function to use for creating symlinks
1848 Returns: stat object for the file
1849 """
1850 try:
1851 oldstat = os.lstat(target_path)
1852 except FileNotFoundError:
1853 oldstat = None
1854 contents = blob.as_raw_string()
1855 if stat.S_ISLNK(mode):
1856 if oldstat:
1857 _remove_file_with_readonly_handling(target_path)
1858 if sys.platform == "win32":
1859 # os.readlink on Python3 on Windows requires a unicode string.
1860 contents_str = contents.decode(tree_encoding)
1861 target_path_str = target_path.decode(tree_encoding)
1862 (symlink_fn or symlink)(contents_str, target_path_str)
1863 else:
1864 (symlink_fn or symlink)(contents, target_path)
1865 else:
1866 if oldstat is not None and oldstat.st_size == len(contents):
1867 with open(target_path, "rb") as f:
1868 if f.read() == contents:
1869 return oldstat
1871 with open(target_path, "wb") as f:
1872 # Write out file
1873 f.write(contents)
1875 if honor_filemode:
1876 os.chmod(target_path, mode)
1878 return os.lstat(target_path)
1881INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1884def _normalize_path_element_default(element: bytes) -> bytes:
1885 """Normalize path element for default case-insensitive comparison."""
1886 return element.lower()
1889def _normalize_path_element_ntfs(element: bytes) -> bytes:
1890 """Normalize path element for NTFS filesystem."""
1891 return element.rstrip(b". ").lower()
1894def _normalize_path_element_hfs(element: bytes) -> bytes:
1895 """Normalize path element for HFS+ filesystem."""
1896 import unicodedata
1898 # Decode to Unicode (let UnicodeDecodeError bubble up)
1899 element_str = element.decode("utf-8", errors="strict")
1901 # Remove HFS+ ignorable characters
1902 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1903 # Normalize to NFD
1904 normalized = unicodedata.normalize("NFD", filtered)
1905 return normalized.lower().encode("utf-8", errors="strict")
1908def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
1909 """Get the appropriate path element normalization function based on config.
1911 Args:
1912 config: Repository configuration object
1914 Returns:
1915 Function that normalizes path elements for the configured filesystem
1916 """
1917 import os
1918 import sys
1920 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1921 return _normalize_path_element_ntfs
1922 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1923 return _normalize_path_element_hfs
1924 else:
1925 return _normalize_path_element_default
1928def make_path_normalizer(
1929 config: "Config",
1930) -> Callable[[bytes], bytes] | None:
1931 """Build a path normalizer honoring ``core.ignorecase`` and ``core.precomposeunicode``.
1933 The returned callable maps a filesystem-form path to a canonical form
1934 used to match equivalent paths (e.g. ``Foo.txt`` ↔ ``foo.txt`` when
1935 ``core.ignorecase=true``, NFD ↔ NFC when ``core.precomposeunicode=true``).
1936 Returns ``None`` when neither option is active so callers can skip the
1937 comparison entirely.
1938 """
1939 ignorecase = config.get_boolean(b"core", b"ignorecase", False)
1940 precompose = config.get_boolean(b"core", b"precomposeunicode", False)
1941 if not ignorecase and not precompose:
1942 return None
1944 def normalize(path: bytes) -> bytes:
1945 if precompose:
1946 import unicodedata
1948 try:
1949 path = unicodedata.normalize("NFC", path.decode("utf-8")).encode(
1950 "utf-8"
1951 )
1952 except UnicodeDecodeError:
1953 pass
1954 if ignorecase:
1955 path = path.lower()
1956 return path
1958 return normalize
1961def validate_path_element_default(element: bytes) -> bool:
1962 """Validate a path element using default rules.
1964 Args:
1965 element: Path element to validate
1967 Returns:
1968 True if path element is valid, False otherwise
1969 """
1970 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1973def validate_path_element_ntfs(element: bytes) -> bool:
1974 """Validate a path element using NTFS filesystem rules.
1976 Args:
1977 element: Path element to validate
1979 Returns:
1980 True if path element is valid for NTFS, False otherwise
1981 """
1982 normalized = _normalize_path_element_ntfs(element)
1983 if normalized in INVALID_DOTNAMES:
1984 return False
1985 if normalized == b"git~1":
1986 return False
1987 return True
1990# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1991HFS_IGNORABLE_CHARS = {
1992 0x200C, # ZERO WIDTH NON-JOINER
1993 0x200D, # ZERO WIDTH JOINER
1994 0x200E, # LEFT-TO-RIGHT MARK
1995 0x200F, # RIGHT-TO-LEFT MARK
1996 0x202A, # LEFT-TO-RIGHT EMBEDDING
1997 0x202B, # RIGHT-TO-LEFT EMBEDDING
1998 0x202C, # POP DIRECTIONAL FORMATTING
1999 0x202D, # LEFT-TO-RIGHT OVERRIDE
2000 0x202E, # RIGHT-TO-LEFT OVERRIDE
2001 0x206A, # INHIBIT SYMMETRIC SWAPPING
2002 0x206B, # ACTIVATE SYMMETRIC SWAPPING
2003 0x206C, # INHIBIT ARABIC FORM SHAPING
2004 0x206D, # ACTIVATE ARABIC FORM SHAPING
2005 0x206E, # NATIONAL DIGIT SHAPES
2006 0x206F, # NOMINAL DIGIT SHAPES
2007 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
2008}
2011def validate_path_element_hfs(element: bytes) -> bool:
2012 """Validate path element for HFS+ filesystem.
2014 Equivalent to Git's is_hfs_dotgit and related checks.
2015 Uses NFD normalization and ignores HFS+ ignorable characters.
2016 """
2017 try:
2018 normalized = _normalize_path_element_hfs(element)
2019 except UnicodeDecodeError:
2020 # Malformed UTF-8 - be conservative and reject
2021 return False
2023 # Check against invalid names
2024 if normalized in INVALID_DOTNAMES:
2025 return False
2027 # Also check for 8.3 short name
2028 if normalized == b"git~1":
2029 return False
2031 return True
2034def validate_path(
2035 path: bytes,
2036 element_validator: Callable[[bytes], bool] = validate_path_element_default,
2037) -> bool:
2038 """Default path validator that just checks for .git/."""
2039 parts = path.split(b"/")
2040 for p in parts:
2041 if not element_validator(p):
2042 return False
2043 else:
2044 return True
2047def build_index_from_tree(
2048 root_path: str | bytes,
2049 index_path: str | bytes,
2050 object_store: ObjectContainer,
2051 tree_id: ObjectID,
2052 honor_filemode: bool = True,
2053 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
2054 symlink_fn: Callable[
2055 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2056 ]
2057 | None = None,
2058 blob_normalizer: "FilterBlobNormalizer | None" = None,
2059 tree_encoding: str = "utf-8",
2060) -> None:
2061 """Generate and materialize index from a tree.
2063 Args:
2064 tree_id: Tree to materialize
2065 root_path: Target dir for materialized index files
2066 index_path: Target path for generated index
2067 object_store: Non-empty object store holding tree contents
2068 honor_filemode: An optional flag to honor core.filemode setting in
2069 config file, default is core.filemode=True, change executable bit
2070 validate_path_element: Function to validate path elements to check
2071 out; default just refuses .git and .. directories.
2072 symlink_fn: Function to use for creating symlinks
2073 blob_normalizer: An optional BlobNormalizer to use for converting line
2074 endings when writing blobs to the working directory.
2075 tree_encoding: Encoding used for tree paths (default: utf-8)
2077 Note: existing index is wiped and contents are not merged
2078 in a working dir. Suitable only for fresh clones.
2079 """
2080 index = Index(index_path, read=False)
2081 if not isinstance(root_path, bytes):
2082 root_path = os.fsencode(root_path)
2084 for entry in iter_tree_contents(object_store, tree_id):
2085 assert (
2086 entry.path is not None and entry.mode is not None and entry.sha is not None
2087 )
2088 if not validate_path(entry.path, validate_path_element):
2089 continue
2090 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
2092 if not os.path.exists(os.path.dirname(full_path)):
2093 os.makedirs(os.path.dirname(full_path))
2095 # TODO(jelmer): Merge new index into working tree
2096 if S_ISGITLINK(entry.mode):
2097 if not os.path.isdir(full_path):
2098 os.mkdir(full_path)
2099 st = os.lstat(full_path)
2100 # TODO(jelmer): record and return submodule paths
2101 else:
2102 obj = object_store[entry.sha]
2103 assert isinstance(obj, Blob)
2104 # Apply blob normalization for checkout if normalizer is provided
2105 if blob_normalizer is not None:
2106 obj = blob_normalizer.checkout_normalize(obj, entry.path)
2107 st = build_file_from_blob(
2108 obj,
2109 entry.mode,
2110 full_path,
2111 honor_filemode=honor_filemode,
2112 tree_encoding=tree_encoding,
2113 symlink_fn=symlink_fn,
2114 )
2116 # Add file to index
2117 if not honor_filemode or S_ISGITLINK(entry.mode):
2118 # we can not use tuple slicing to build a new tuple,
2119 # because on windows that will convert the times to
2120 # longs, which causes errors further along
2121 st_tuple = (
2122 entry.mode,
2123 st.st_ino,
2124 st.st_dev,
2125 st.st_nlink,
2126 st.st_uid,
2127 st.st_gid,
2128 st.st_size,
2129 st.st_atime,
2130 st.st_mtime,
2131 st.st_ctime,
2132 )
2133 st = st.__class__(st_tuple)
2134 # default to a stage 0 index entry (normal)
2135 # when reading from the filesystem
2136 index[entry.path] = index_entry_from_stat(st, entry.sha)
2138 index.write()
2141def blob_from_path_and_mode(
2142 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
2143) -> Blob:
2144 """Create a blob from a path and a stat object.
2146 Args:
2147 fs_path: Full file system path to file
2148 mode: File mode
2149 tree_encoding: Encoding to use for tree contents
2150 Returns: A `Blob` object
2151 """
2152 assert isinstance(fs_path, bytes)
2153 blob = Blob()
2154 if stat.S_ISLNK(mode):
2155 if sys.platform == "win32":
2156 # os.readlink on Python3 on Windows requires a unicode string.
2157 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
2158 else:
2159 blob.data = os.readlink(fs_path)
2160 else:
2161 with open(fs_path, "rb") as f:
2162 blob.data = f.read()
2163 return blob
2166def blob_from_path_and_stat(
2167 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
2168) -> Blob:
2169 """Create a blob from a path and a stat object.
2171 Args:
2172 fs_path: Full file system path to file
2173 st: A stat object
2174 tree_encoding: Encoding to use for tree contents
2175 Returns: A `Blob` object
2176 """
2177 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
2180def read_submodule_head(path: str | bytes) -> bytes | None:
2181 """Read the head commit of a submodule.
2183 Args:
2184 path: path to the submodule
2185 Returns: HEAD sha, None if not a valid head/repository
2186 """
2187 from .errors import NotGitRepository
2188 from .repo import Repo
2190 # Repo currently expects a "str", so decode if necessary.
2191 # TODO(jelmer): Perhaps move this into Repo() ?
2192 if not isinstance(path, str):
2193 path = os.fsdecode(path)
2194 try:
2195 repo = Repo(path)
2196 except NotGitRepository:
2197 return None
2198 try:
2199 return repo.head()
2200 except KeyError:
2201 return None
2204def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
2205 """Check if a directory has changed after getting an error.
2207 When handling an error trying to create a blob from a path, call this
2208 function. It will check if the path is a directory. If it's a directory
2209 and a submodule, check the submodule head to see if it's has changed. If
2210 not, consider the file as changed as Git tracked a file and not a
2211 directory.
2213 Return true if the given path should be considered as changed and False
2214 otherwise or if the path is not a directory.
2215 """
2216 # This is actually a directory
2217 if os.path.exists(os.path.join(tree_path, b".git")):
2218 # Submodule
2219 head = read_submodule_head(tree_path)
2220 if entry.sha != head:
2221 return True
2222 else:
2223 # The file was changed to a directory, so consider it removed.
2224 return True
2226 return False
2229os_sep_bytes = os.sep.encode("ascii")
2232def _ensure_parent_dir_exists(full_path: bytes) -> None:
2233 """Ensure parent directory exists, checking no parent is a file."""
2234 parent_dir = os.path.dirname(full_path)
2235 if parent_dir and not os.path.exists(parent_dir):
2236 # Walk up the directory tree to find the first existing parent
2237 current = parent_dir
2238 parents_to_check: list[bytes] = []
2240 while current and not os.path.exists(current):
2241 parents_to_check.insert(0, current)
2242 new_parent = os.path.dirname(current)
2243 if new_parent == current:
2244 # Reached the root or can't go up further
2245 break
2246 current = new_parent
2248 # Check if the existing parent (if any) is a directory
2249 if current and os.path.exists(current) and not os.path.isdir(current):
2250 raise OSError(
2251 f"Cannot create directory, parent path is a file: {current!r}"
2252 )
2254 # Now check each parent we need to create isn't blocked by an existing file
2255 for parent_path in parents_to_check:
2256 if os.path.exists(parent_path) and not os.path.isdir(parent_path):
2257 raise OSError(
2258 f"Cannot create directory, parent path is a file: {parent_path!r}"
2259 )
2261 os.makedirs(parent_dir)
2264def _remove_file_with_readonly_handling(path: bytes) -> None:
2265 """Remove a file, handling read-only files on Windows.
2267 Args:
2268 path: Path to the file to remove
2269 """
2270 try:
2271 os.unlink(path)
2272 except PermissionError:
2273 # On Windows, remove read-only attribute and retry
2274 if sys.platform == "win32":
2275 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
2276 os.unlink(path)
2277 else:
2278 raise
2281def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
2282 """Remove empty parent directories up to stop_at."""
2283 parent = os.path.dirname(path)
2284 while parent and parent != stop_at:
2285 try:
2286 os.rmdir(parent)
2287 parent = os.path.dirname(parent)
2288 except FileNotFoundError:
2289 # Directory doesn't exist - stop trying
2290 break
2291 except OSError as e:
2292 if e.errno in (errno.ENOTEMPTY, errno.EEXIST):
2293 # Directory not empty - stop trying
2294 break
2295 raise
2298def _check_symlink_matches(
2299 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: ObjectID
2300) -> bool:
2301 """Check if symlink target matches expected target.
2303 Returns True if symlink matches, False if it doesn't match.
2304 """
2305 try:
2306 current_target = os.readlink(full_path)
2307 blob_obj = repo_object_store[entry_sha]
2308 expected_target = blob_obj.as_raw_string()
2309 if isinstance(current_target, str):
2310 current_target = current_target.encode()
2311 return current_target == expected_target
2312 except FileNotFoundError:
2313 # Symlink doesn't exist
2314 return False
2315 except OSError as e:
2316 if e.errno == errno.EINVAL:
2317 # Not a symlink
2318 return False
2319 raise
2322def _check_file_matches(
2323 repo_object_store: "BaseObjectStore",
2324 full_path: bytes,
2325 entry_sha: ObjectID,
2326 entry_mode: int,
2327 current_stat: os.stat_result,
2328 honor_filemode: bool,
2329 blob_normalizer: "FilterBlobNormalizer | None" = None,
2330 tree_path: bytes | None = None,
2331) -> bool:
2332 """Check if a file on disk matches the expected git object.
2334 Returns True if file matches, False if it doesn't match.
2335 """
2336 # Check mode first (if honor_filemode is True)
2337 if honor_filemode:
2338 current_mode = stat.S_IMODE(current_stat.st_mode)
2339 expected_mode = stat.S_IMODE(entry_mode)
2341 # For regular files, only check the user executable bit, not group/other permissions
2342 # This matches Git's behavior where umask differences don't count as modifications
2343 if stat.S_ISREG(current_stat.st_mode):
2344 # Normalize regular file modes to ignore group/other write permissions
2345 current_mode_normalized = (
2346 current_mode & 0o755
2347 ) # Keep only user rwx and all read+execute
2348 expected_mode_normalized = expected_mode & 0o755
2350 # For Git compatibility, regular files should be either 644 or 755
2351 if expected_mode_normalized not in (0o644, 0o755):
2352 expected_mode_normalized = 0o644 # Default for regular files
2353 if current_mode_normalized not in (0o644, 0o755):
2354 # Determine if it should be executable based on user execute bit
2355 if current_mode & 0o100: # User execute bit is set
2356 current_mode_normalized = 0o755
2357 else:
2358 current_mode_normalized = 0o644
2360 if current_mode_normalized != expected_mode_normalized:
2361 return False
2362 else:
2363 # For non-regular files (symlinks, etc.), check mode exactly
2364 if current_mode != expected_mode:
2365 return False
2367 # If mode matches (or we don't care), check content via size first
2368 blob_obj = repo_object_store[entry_sha]
2369 if current_stat.st_size != blob_obj.raw_length():
2370 return False
2372 # Size matches, check actual content
2373 try:
2374 with open(full_path, "rb") as f:
2375 current_content = f.read()
2376 expected_content = blob_obj.as_raw_string()
2377 if blob_normalizer and tree_path is not None:
2378 assert isinstance(blob_obj, Blob)
2379 normalized_blob = blob_normalizer.checkout_normalize(
2380 blob_obj, tree_path
2381 )
2382 expected_content = normalized_blob.as_raw_string()
2383 return current_content == expected_content
2384 except (FileNotFoundError, PermissionError, IsADirectoryError):
2385 return False
2388def _transition_to_submodule(
2389 repo: "Repo",
2390 path: bytes,
2391 full_path: bytes,
2392 current_stat: os.stat_result | None,
2393 entry: IndexEntry | TreeEntry,
2394 index: Index,
2395) -> None:
2396 """Transition any type to submodule."""
2397 from .submodule import ensure_submodule_placeholder
2399 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2400 # Already a directory, just ensure .git file exists
2401 ensure_submodule_placeholder(repo, path)
2402 else:
2403 # Remove whatever is there and create submodule
2404 if current_stat is not None:
2405 _remove_file_with_readonly_handling(full_path)
2406 ensure_submodule_placeholder(repo, path)
2408 st = os.lstat(full_path)
2409 assert entry.sha is not None
2410 index[path] = index_entry_from_stat(st, entry.sha)
2413def _transition_to_file(
2414 object_store: "BaseObjectStore",
2415 path: bytes,
2416 full_path: bytes,
2417 current_stat: os.stat_result | None,
2418 entry: IndexEntry | TreeEntry,
2419 index: Index,
2420 honor_filemode: bool,
2421 symlink_fn: Callable[
2422 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2423 ]
2424 | None,
2425 blob_normalizer: "FilterBlobNormalizer | None",
2426 tree_encoding: str = "utf-8",
2427) -> None:
2428 """Transition any type to regular file or symlink."""
2429 assert entry.sha is not None and entry.mode is not None
2430 # Check if we need to update
2431 if (
2432 current_stat is not None
2433 and stat.S_ISREG(current_stat.st_mode)
2434 and not stat.S_ISLNK(entry.mode)
2435 ):
2436 # File to file - check if update needed
2437 file_matches = _check_file_matches(
2438 object_store,
2439 full_path,
2440 entry.sha,
2441 entry.mode,
2442 current_stat,
2443 honor_filemode,
2444 blob_normalizer,
2445 path,
2446 )
2447 needs_update = not file_matches
2448 elif (
2449 current_stat is not None
2450 and stat.S_ISLNK(current_stat.st_mode)
2451 and stat.S_ISLNK(entry.mode)
2452 ):
2453 # Symlink to symlink - check if update needed
2454 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
2455 needs_update = not symlink_matches
2456 else:
2457 needs_update = True
2459 if not needs_update:
2460 # Just update index - current_stat should always be valid here since we're not updating
2461 assert current_stat is not None
2462 index[path] = index_entry_from_stat(current_stat, entry.sha)
2463 return
2465 # Remove existing entry if needed
2466 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2467 # Remove directory
2468 dir_contents = set(os.listdir(full_path))
2469 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2471 if git_file_name in dir_contents:
2472 if dir_contents != {git_file_name}:
2473 raise IsADirectoryError(
2474 f"Cannot replace submodule with untracked files: {full_path!r}"
2475 )
2476 shutil.rmtree(full_path)
2477 else:
2478 try:
2479 os.rmdir(full_path)
2480 except OSError as e:
2481 if e.errno in (errno.ENOTEMPTY, errno.EEXIST):
2482 raise IsADirectoryError(
2483 f"Cannot replace non-empty directory with file: {full_path!r}"
2484 )
2485 raise
2486 elif current_stat is not None:
2487 _remove_file_with_readonly_handling(full_path)
2489 # Ensure parent directory exists
2490 _ensure_parent_dir_exists(full_path)
2492 # Write the file
2493 blob_obj = object_store[entry.sha]
2494 assert isinstance(blob_obj, Blob)
2495 if blob_normalizer:
2496 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
2497 st = build_file_from_blob(
2498 blob_obj,
2499 entry.mode,
2500 full_path,
2501 honor_filemode=honor_filemode,
2502 tree_encoding=tree_encoding,
2503 symlink_fn=symlink_fn,
2504 )
2505 index[path] = index_entry_from_stat(st, entry.sha)
2508def _transition_to_absent(
2509 repo: "Repo",
2510 path: bytes,
2511 full_path: bytes,
2512 current_stat: os.stat_result | None,
2513 index: Index,
2514) -> None:
2515 """Remove any type of entry."""
2516 if current_stat is None:
2517 return
2519 if stat.S_ISDIR(current_stat.st_mode):
2520 # Check if it's a submodule directory
2521 dir_contents = set(os.listdir(full_path))
2522 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2524 if git_file_name in dir_contents and dir_contents == {git_file_name}:
2525 shutil.rmtree(full_path)
2526 else:
2527 try:
2528 os.rmdir(full_path)
2529 except OSError as e:
2530 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
2531 raise
2532 else:
2533 _remove_file_with_readonly_handling(full_path)
2535 try:
2536 del index[path]
2537 except KeyError:
2538 pass
2540 # Try to remove empty parent directories
2541 _remove_empty_parents(
2542 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2543 )
2546def detect_case_only_renames(
2547 changes: Sequence["TreeChange"],
2548 config: "Config",
2549) -> list["TreeChange"]:
2550 """Detect and transform case-only renames in a list of tree changes.
2552 This function identifies file renames that only differ in case (e.g.,
2553 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
2554 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
2555 based on the repository configuration.
2557 Args:
2558 changes: List of TreeChange objects representing file changes
2559 config: Repository configuration object
2561 Returns:
2562 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
2563 """
2564 from .diff_tree import (
2565 CHANGE_ADD,
2566 CHANGE_COPY,
2567 CHANGE_DELETE,
2568 CHANGE_MODIFY,
2569 CHANGE_RENAME,
2570 TreeChange,
2571 )
2573 # Build dictionaries of old and new paths with their normalized forms
2574 old_paths_normalized = {}
2575 new_paths_normalized = {}
2576 old_changes = {} # Map from old path to change object
2577 new_changes = {} # Map from new path to change object
2579 # Get the appropriate normalizer based on config
2580 normalize_func = get_path_element_normalizer(config)
2582 def normalize_path(path: bytes) -> bytes:
2583 """Normalize entire path using element normalization."""
2584 return b"/".join(normalize_func(part) for part in path.split(b"/"))
2586 # Pre-normalize all paths once to avoid repeated normalization
2587 for change in changes:
2588 if change.type == CHANGE_DELETE and change.old:
2589 assert change.old.path is not None
2590 try:
2591 normalized = normalize_path(change.old.path)
2592 except UnicodeDecodeError:
2593 import logging
2595 logging.warning(
2596 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2597 change.old.path,
2598 )
2599 else:
2600 old_paths_normalized[normalized] = change.old.path
2601 old_changes[change.old.path] = change
2602 elif change.type == CHANGE_RENAME and change.old:
2603 assert change.old.path is not None
2604 # Treat RENAME as DELETE + ADD for case-only detection
2605 try:
2606 normalized = normalize_path(change.old.path)
2607 except UnicodeDecodeError:
2608 import logging
2610 logging.warning(
2611 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2612 change.old.path,
2613 )
2614 else:
2615 old_paths_normalized[normalized] = change.old.path
2616 old_changes[change.old.path] = change
2618 if (
2619 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
2620 and change.new
2621 ):
2622 assert change.new.path is not None
2623 try:
2624 normalized = normalize_path(change.new.path)
2625 except UnicodeDecodeError:
2626 import logging
2628 logging.warning(
2629 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2630 change.new.path,
2631 )
2632 else:
2633 new_paths_normalized[normalized] = change.new.path
2634 new_changes[change.new.path] = change
2636 # Find case-only renames and transform changes
2637 case_only_renames = set()
2638 new_rename_changes = []
2640 for norm_path, old_path in old_paths_normalized.items():
2641 if norm_path in new_paths_normalized:
2642 new_path = new_paths_normalized[norm_path]
2643 if old_path != new_path:
2644 # Found a case-only rename
2645 old_change = old_changes[old_path]
2646 new_change = new_changes[new_path]
2648 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
2649 if new_change.type == CHANGE_ADD:
2650 # Simple case: DELETE + ADD becomes RENAME
2651 rename_change = TreeChange(
2652 CHANGE_RENAME, old_change.old, new_change.new
2653 )
2654 else:
2655 # Complex case: DELETE + MODIFY becomes RENAME
2656 # Use the old file from DELETE and new file from MODIFY
2657 rename_change = TreeChange(
2658 CHANGE_RENAME, old_change.old, new_change.new
2659 )
2661 new_rename_changes.append(rename_change)
2663 # Mark the old changes for removal
2664 case_only_renames.add(old_change)
2665 case_only_renames.add(new_change)
2667 # Return new list with original ADD/DELETE changes replaced by renames
2668 result = [change for change in changes if change not in case_only_renames]
2669 result.extend(new_rename_changes)
2670 return result
2673def update_working_tree(
2674 repo: "Repo",
2675 old_tree_id: bytes | None,
2676 new_tree_id: bytes,
2677 change_iterator: Iterator["TreeChange"],
2678 honor_filemode: bool = True,
2679 validate_path_element: Callable[[bytes], bool] | None = None,
2680 symlink_fn: Callable[
2681 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2682 ]
2683 | None = None,
2684 force_remove_untracked: bool = False,
2685 blob_normalizer: "FilterBlobNormalizer | None" = None,
2686 tree_encoding: str = "utf-8",
2687 allow_overwrite_modified: bool = False,
2688 *,
2689 config: "Config | None" = None,
2690) -> None:
2691 """Update the working tree and index to match a new tree.
2693 This function handles:
2694 - Adding new files
2695 - Updating modified files
2696 - Removing deleted files
2697 - Cleaning up empty directories
2699 Args:
2700 repo: Repository object
2701 old_tree_id: SHA of the tree before the update
2702 new_tree_id: SHA of the tree to update to
2703 change_iterator: Iterator of TreeChange objects to apply
2704 honor_filemode: An optional flag to honor core.filemode setting
2705 validate_path_element: Function to validate path elements to check out
2706 symlink_fn: Function to use for creating symlinks
2707 force_remove_untracked: If True, remove files that exist in working
2708 directory but not in target tree, even if old_tree_id is None
2709 blob_normalizer: An optional BlobNormalizer to use for converting line
2710 endings when writing blobs to the working directory.
2711 tree_encoding: Encoding used for tree paths (default: utf-8)
2712 allow_overwrite_modified: If False, raise an error when attempting to
2713 overwrite files that have been modified compared to old_tree_id
2714 config: Repository configuration. If None, falls back to
2715 ``repo.get_config_stack()``.
2716 """
2717 if validate_path_element is None:
2718 validate_path_element = validate_path_element_default
2720 from .diff_tree import (
2721 CHANGE_ADD,
2722 CHANGE_COPY,
2723 CHANGE_DELETE,
2724 CHANGE_MODIFY,
2725 CHANGE_RENAME,
2726 CHANGE_UNCHANGED,
2727 )
2729 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2730 if config is None:
2731 config = repo.get_config_stack()
2732 index = repo.open_index(config=config)
2734 # Convert iterator to list since we need multiple passes
2735 changes = list(change_iterator)
2737 # Transform case-only renames on case-insensitive filesystems
2738 import platform
2740 default_ignore_case = platform.system() in ("Windows", "Darwin")
2741 config = repo.get_config()
2742 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2744 if ignore_case:
2745 config = repo.get_config()
2746 changes = detect_case_only_renames(changes, config)
2748 # Check for path conflicts where files need to become directories
2749 paths_becoming_dirs = set()
2750 for change in changes:
2751 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2752 assert change.new is not None
2753 path = change.new.path
2754 assert path is not None
2755 if b"/" in path: # This is a file inside a directory
2756 # Check if any parent path exists as a file in the old tree or changes
2757 parts = path.split(b"/")
2758 for i in range(1, len(parts)):
2759 parent = b"/".join(parts[:i])
2760 # See if this parent path is being deleted (was a file, becoming a dir)
2761 for other_change in changes:
2762 if (
2763 other_change.type == CHANGE_DELETE
2764 and other_change.old
2765 and other_change.old.path == parent
2766 ):
2767 paths_becoming_dirs.add(parent)
2769 # Check if any path that needs to become a directory has been modified
2770 for path in paths_becoming_dirs:
2771 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2772 try:
2773 current_stat = os.lstat(full_path)
2774 except FileNotFoundError:
2775 continue # File doesn't exist, nothing to check
2776 except OSError as e:
2777 raise OSError(
2778 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2779 ) from e
2781 if stat.S_ISREG(current_stat.st_mode):
2782 # Find the old entry for this path
2783 old_change = None
2784 for change in changes:
2785 if (
2786 change.type == CHANGE_DELETE
2787 and change.old
2788 and change.old.path == path
2789 ):
2790 old_change = change
2791 break
2793 if old_change:
2794 # Check if file has been modified
2795 assert old_change.old is not None
2796 assert (
2797 old_change.old.sha is not None and old_change.old.mode is not None
2798 )
2799 file_matches = _check_file_matches(
2800 repo.object_store,
2801 full_path,
2802 old_change.old.sha,
2803 old_change.old.mode,
2804 current_stat,
2805 honor_filemode,
2806 blob_normalizer,
2807 path,
2808 )
2809 if not file_matches:
2810 raise OSError(
2811 f"Cannot replace modified file with directory: {path!r}"
2812 )
2814 # Check for uncommitted modifications before making any changes
2815 if not allow_overwrite_modified and old_tree_id:
2816 for change in changes:
2817 # Only check files that are being modified or deleted
2818 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2819 path = change.old.path
2820 assert path is not None
2821 if not validate_path(path, validate_path_element):
2822 continue
2824 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2825 try:
2826 current_stat = os.lstat(full_path)
2827 except FileNotFoundError:
2828 continue # File doesn't exist, nothing to check
2829 except OSError as e:
2830 raise OSError(
2831 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2832 ) from e
2834 if stat.S_ISREG(current_stat.st_mode):
2835 # Check if working tree file differs from old tree
2836 assert change.old.sha is not None and change.old.mode is not None
2837 file_matches = _check_file_matches(
2838 repo.object_store,
2839 full_path,
2840 change.old.sha,
2841 change.old.mode,
2842 current_stat,
2843 honor_filemode,
2844 blob_normalizer,
2845 path,
2846 )
2847 if not file_matches:
2848 from .errors import WorkingTreeModifiedError
2850 raise WorkingTreeModifiedError(
2851 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2852 f"would be overwritten by checkout. "
2853 f"Please commit your changes or stash them before you switch branches."
2854 )
2856 # Apply the changes
2857 for change in changes:
2858 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2859 # Remove file/directory
2860 assert change.old is not None and change.old.path is not None
2861 path = change.old.path
2862 if not validate_path(path, validate_path_element):
2863 continue
2865 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2866 try:
2867 delete_stat: os.stat_result | None = os.lstat(full_path)
2868 except FileNotFoundError:
2869 delete_stat = None
2870 except OSError as e:
2871 raise OSError(
2872 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2873 ) from e
2875 _transition_to_absent(repo, path, full_path, delete_stat, index)
2877 if change.type in (
2878 CHANGE_ADD,
2879 CHANGE_MODIFY,
2880 CHANGE_UNCHANGED,
2881 CHANGE_COPY,
2882 CHANGE_RENAME,
2883 ):
2884 # Add or modify file
2885 assert (
2886 change.new is not None
2887 and change.new.path is not None
2888 and change.new.mode is not None
2889 )
2890 path = change.new.path
2891 if not validate_path(path, validate_path_element):
2892 continue
2894 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2895 try:
2896 modify_stat: os.stat_result | None = os.lstat(full_path)
2897 except FileNotFoundError:
2898 modify_stat = None
2899 except OSError as e:
2900 raise OSError(
2901 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2902 ) from e
2904 if S_ISGITLINK(change.new.mode):
2905 _transition_to_submodule(
2906 repo, path, full_path, modify_stat, change.new, index
2907 )
2908 else:
2909 _transition_to_file(
2910 repo.object_store,
2911 path,
2912 full_path,
2913 modify_stat,
2914 change.new,
2915 index,
2916 honor_filemode,
2917 symlink_fn,
2918 blob_normalizer,
2919 tree_encoding,
2920 )
2922 index.write()
2925def _stat_matches_entry(
2926 st: os.stat_result, entry: IndexEntry, trust_ctime: bool = True
2927) -> bool:
2928 """Check if filesystem stat matches index entry stat.
2930 This is used to determine if a file might have changed without reading its content.
2931 Git uses this optimization to avoid expensive filter operations on unchanged files.
2933 Args:
2934 st: Filesystem stat result
2935 entry: Index entry to compare against
2936 trust_ctime: If True, also check ctime (default: True, matching Git behavior)
2937 Returns: True if stat matches and file is likely unchanged
2938 """
2939 # Compare change time (ctime) if trust_ctime is enabled
2940 if trust_ctime:
2941 # Get entry ctime with nanosecond precision if available
2942 if isinstance(entry.ctime, tuple):
2943 entry_ctime_sec = entry.ctime[0]
2944 entry_ctime_nsec = entry.ctime[1]
2945 else:
2946 entry_ctime_sec = int(entry.ctime)
2947 entry_ctime_nsec = 0
2949 if hasattr(st, "st_ctime_ns"):
2950 # Use nanosecond precision when available
2951 st_ctime_nsec = st.st_ctime_ns
2952 entry_ctime_nsec_total = entry_ctime_sec * 1_000_000_000 + entry_ctime_nsec
2953 if st_ctime_nsec != entry_ctime_nsec_total:
2954 return False
2955 else:
2956 # Fall back to second precision
2957 if int(st.st_ctime) != entry_ctime_sec:
2958 return False
2960 # Get entry mtime with nanosecond precision if available
2961 if isinstance(entry.mtime, tuple):
2962 entry_mtime_sec = entry.mtime[0]
2963 entry_mtime_nsec = entry.mtime[1]
2964 else:
2965 entry_mtime_sec = int(entry.mtime)
2966 entry_mtime_nsec = 0
2968 # Compare modification time with nanosecond precision if available
2969 # This is important for fast workflows (e.g., stash) where files can be
2970 # modified multiple times within the same second
2971 if hasattr(st, "st_mtime_ns"):
2972 # Use nanosecond precision when available
2973 st_mtime_nsec = st.st_mtime_ns
2974 entry_mtime_nsec_total = entry_mtime_sec * 1_000_000_000 + entry_mtime_nsec
2975 if st_mtime_nsec != entry_mtime_nsec_total:
2976 return False
2977 else:
2978 # Fall back to second precision
2979 if int(st.st_mtime) != entry_mtime_sec:
2980 return False
2982 # Compare file size
2983 if st.st_size != entry.size:
2984 return False
2986 # If all checks pass, file is likely unchanged
2987 return True
2990def _check_entry_for_changes(
2991 tree_path: bytes,
2992 entry: IndexEntry | ConflictedIndexEntry,
2993 root_path: bytes,
2994 filter_blob_callback: Callable[[Blob, bytes], Blob] | None = None,
2995 trust_ctime: bool = True,
2996) -> bytes | None:
2997 """Check a single index entry for changes.
2999 Args:
3000 tree_path: Path in the tree
3001 entry: Index entry to check
3002 root_path: Root filesystem path
3003 filter_blob_callback: Optional callback to filter blobs
3004 trust_ctime: If True, use ctime for change detection (default: True)
3005 Returns: tree_path if changed, None otherwise
3006 """
3007 if isinstance(entry, ConflictedIndexEntry):
3008 # Conflicted files are always unstaged
3009 return tree_path
3011 full_path = _tree_to_fs_path(root_path, tree_path)
3012 try:
3013 st = os.lstat(full_path)
3014 if stat.S_ISDIR(st.st_mode):
3015 if _has_directory_changed(tree_path, entry):
3016 return tree_path
3017 return None
3019 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
3020 return None
3022 # Optimization: If stat matches index entry (mtime and size unchanged),
3023 # we can skip reading and filtering the file entirely. This is a significant
3024 # performance improvement for repositories with many unchanged files.
3025 # Even with filters (e.g., LFS), if the file hasn't been modified (stat unchanged),
3026 # the filter output would be the same, so we can safely skip the expensive
3027 # filter operation. This addresses performance issues with LFS repositories
3028 # where filter operations can be very slow.
3029 if _stat_matches_entry(st, entry, trust_ctime):
3030 return None
3032 blob = blob_from_path_and_stat(full_path, st)
3034 if filter_blob_callback is not None:
3035 blob = filter_blob_callback(blob, tree_path)
3036 except FileNotFoundError:
3037 # The file was removed, so we assume that counts as
3038 # different from whatever file used to exist.
3039 return tree_path
3040 else:
3041 if blob.id != entry.sha:
3042 return tree_path
3043 return None
3046def get_unstaged_changes(
3047 index: Index,
3048 root_path: str | bytes,
3049 filter_blob_callback: Callable[..., Any] | None = None,
3050 preload_index: bool = False,
3051 trust_ctime: bool = True,
3052 max_stat: int | None = None,
3053) -> Generator[bytes, None, None]:
3054 """Walk through an index and check for differences against working tree.
3056 Args:
3057 index: index to check
3058 root_path: path in which to find files
3059 filter_blob_callback: Optional callback to filter blobs
3060 preload_index: If True, use parallel threads to check files (requires threading support)
3061 trust_ctime: If True, use ctime for change detection (default: True)
3062 max_stat: If set, limit the number of stat operations performed.
3063 When the limit is reached, remaining files are assumed unchanged.
3064 Returns: iterator over paths with unstaged changes
3065 """
3066 # For each entry in the index check the sha1 & ensure not staged
3067 if not isinstance(root_path, bytes):
3068 root_path = os.fsencode(root_path)
3070 stat_count = 0
3072 if preload_index:
3073 # Use parallel processing for better performance on slow filesystems
3074 try:
3075 import multiprocessing
3076 from concurrent.futures import ThreadPoolExecutor
3077 except ImportError:
3078 # If threading is not available, fall back to serial processing
3079 preload_index = False
3080 else:
3081 # Collect all entries first
3082 entries = list(index.iteritems())
3084 if max_stat is not None:
3085 # When max_stat is set, limit the entries we process
3086 entries = entries[:max_stat]
3088 # Use number of CPUs but cap at 8 threads to avoid overhead
3089 num_workers = min(multiprocessing.cpu_count(), 8)
3091 # Process entries in parallel
3092 with ThreadPoolExecutor(max_workers=num_workers) as executor:
3093 # Submit all tasks
3094 futures = [
3095 executor.submit(
3096 _check_entry_for_changes,
3097 tree_path,
3098 entry,
3099 root_path,
3100 filter_blob_callback,
3101 trust_ctime,
3102 )
3103 for tree_path, entry in entries
3104 ]
3106 # Yield results as they complete
3107 for future in futures:
3108 result = future.result()
3109 if result is not None:
3110 yield result
3112 if not preload_index:
3113 # Serial processing
3114 for tree_path, entry in index.iteritems():
3115 if max_stat is not None and stat_count >= max_stat:
3116 return
3117 result = _check_entry_for_changes(
3118 tree_path, entry, root_path, filter_blob_callback, trust_ctime
3119 )
3120 stat_count += 1
3121 if result is not None:
3122 yield result
3125def _decode_utf8_with_fallback(data: bytes) -> str:
3126 """Decode bytes as UTF-8, with lossy fallbacks for invalid sequences.
3128 Mirrors the behaviour of git-for-windows's ``xutftowcsn`` (in
3129 ``compat/mingw.c``) so that tree paths containing legacy-encoded or
3130 otherwise invalid UTF-8 produce the same on-disk filename as C git.
3132 Rules:
3133 * Valid UTF-8 (1-4 byte sequences, excluding overlongs and codepoints
3134 > U+10FFFF) is decoded normally.
3135 * Invalid bytes in 0xa0-0xff map 1:1 to U+00A0-U+00FF.
3136 * Invalid bytes in 0x80-0x9f are expanded to two lowercase ASCII hex
3137 digits (e.g. byte 0x80 -> "80").
3138 * Truncated multi-byte sequences and overlong/out-of-range encodings
3139 cause the lead byte to fall through to the above invalid-byte rules
3140 (the trail bytes are re-evaluated on the next iteration).
3141 """
3142 out: list[str] = []
3143 i = 0
3144 n = len(data)
3145 while i < n:
3146 c = data[i]
3147 if c < 0x80:
3148 out.append(chr(c))
3149 i += 1
3150 elif 0xC2 <= c < 0xE0 and i + 1 < n and (data[i + 1] & 0xC0) == 0x80:
3151 cp = ((c & 0x1F) << 6) | (data[i + 1] & 0x3F)
3152 out.append(chr(cp))
3153 i += 2
3154 elif (
3155 0xE0 <= c < 0xF0
3156 and i + 2 < n
3157 and not (c == 0xE0 and data[i + 1] < 0xA0)
3158 and (data[i + 1] & 0xC0) == 0x80
3159 and (data[i + 2] & 0xC0) == 0x80
3160 ):
3161 cp = ((c & 0x0F) << 12) | ((data[i + 1] & 0x3F) << 6) | (data[i + 2] & 0x3F)
3162 out.append(chr(cp))
3163 i += 3
3164 elif (
3165 0xF0 <= c < 0xF5
3166 and i + 3 < n
3167 and not (c == 0xF0 and data[i + 1] < 0x90)
3168 and not (c == 0xF4 and data[i + 1] >= 0x90)
3169 and (data[i + 1] & 0xC0) == 0x80
3170 and (data[i + 2] & 0xC0) == 0x80
3171 and (data[i + 3] & 0xC0) == 0x80
3172 ):
3173 cp = (
3174 ((c & 0x07) << 18)
3175 | ((data[i + 1] & 0x3F) << 12)
3176 | ((data[i + 2] & 0x3F) << 6)
3177 | (data[i + 3] & 0x3F)
3178 )
3179 out.append(chr(cp))
3180 i += 4
3181 elif c >= 0xA0:
3182 out.append(chr(c))
3183 i += 1
3184 else:
3185 out.append(f"{c:02x}")
3186 i += 1
3187 return "".join(out)
3190def _tree_to_fs_path(
3191 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
3192) -> bytes:
3193 """Convert a git tree path to a file system path.
3195 Args:
3196 root_path: Root filesystem path
3197 tree_path: Git tree path as bytes (encoded with tree_encoding)
3198 tree_encoding: Encoding used for tree paths (default: utf-8)
3200 Returns: File system path.
3201 """
3202 assert isinstance(tree_path, bytes)
3203 if os_sep_bytes != b"/":
3204 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
3205 else:
3206 sep_corrected_path = tree_path
3208 # On Windows, decode tree-encoded bytes to a str so they can flow into
3209 # the wide-char Win32 APIs via Python's filesystem layer. For UTF-8
3210 # (the default tree encoding) we use a lossy decoder that matches C
3211 # git's xutftowcsn fallbacks; for other encodings we let UnicodeDecodeError
3212 # propagate rather than silently producing a corrupt path.
3213 if sys.platform == "win32":
3214 if tree_encoding == "utf-8":
3215 tree_path_str = _decode_utf8_with_fallback(sep_corrected_path)
3216 else:
3217 tree_path_str = sep_corrected_path.decode(tree_encoding)
3218 sep_corrected_path = os.fsencode(tree_path_str)
3220 return os.path.join(root_path, sep_corrected_path)
3223def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes:
3224 """Convert a file system path to a git tree path.
3226 Args:
3227 fs_path: File system path.
3228 tree_encoding: Encoding to use for tree paths (default: utf-8)
3230 Returns: Git tree path as bytes (encoded with tree_encoding)
3231 """
3232 if not isinstance(fs_path, bytes):
3233 fs_path_bytes = os.fsencode(fs_path)
3234 else:
3235 fs_path_bytes = fs_path
3237 # On Windows the on-disk filename is a UTF-16 wide string; Python gives
3238 # us either str (already decoded) or bytes encoded via the filesystem
3239 # codec. Normalise to str, then encode under the tree encoding so the
3240 # resulting tree path is plain UTF-8. This matches C git's xwcstoutf,
3241 # which is just WideCharToMultiByte(CP_UTF8) — it makes no attempt to
3242 # reverse the xutftowcsn fallbacks, so a file that was checked out from
3243 # a tree path with invalid UTF-8 will read back as the lossy form (the
3244 # same divergence C git exhibits, documented as a one-way mapping).
3245 if sys.platform == "win32":
3246 fs_path_str = os.fsdecode(fs_path_bytes)
3247 fs_path_bytes = fs_path_str.encode(tree_encoding)
3249 if os_sep_bytes != b"/":
3250 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
3251 else:
3252 tree_path = fs_path_bytes
3253 return tree_path
3256def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None:
3257 """Create an index entry for a directory.
3259 This is only used for submodules (directories containing .git).
3261 Args:
3262 st: Stat result for the directory
3263 path: Path to the directory
3265 Returns:
3266 IndexEntry for a submodule, or None if not a submodule
3267 """
3268 if os.path.exists(os.path.join(path, b".git")):
3269 head = read_submodule_head(path)
3270 if head is None:
3271 return None
3272 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
3273 return None
3276def index_entry_from_path(
3277 path: bytes, object_store: ObjectContainer | None = None
3278) -> IndexEntry | None:
3279 """Create an index from a filesystem path.
3281 This returns an index value for files, symlinks
3282 and tree references. for directories and
3283 non-existent files it returns None
3285 Args:
3286 path: Path to create an index entry for
3287 object_store: Optional object store to
3288 save new blobs in
3289 Returns: An index entry; None for directories
3290 """
3291 assert isinstance(path, bytes)
3292 st = os.lstat(path)
3293 if stat.S_ISDIR(st.st_mode):
3294 return index_entry_from_directory(st, path)
3296 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
3297 blob = blob_from_path_and_stat(path, st)
3298 if object_store is not None:
3299 object_store.add_object(blob)
3300 return index_entry_from_stat(st, blob.id)
3302 return None
3305def iter_fresh_entries(
3306 paths: Iterable[bytes],
3307 root_path: bytes,
3308 object_store: ObjectContainer | None = None,
3309) -> Iterator[tuple[bytes, IndexEntry | None]]:
3310 """Iterate over current versions of index entries on disk.
3312 Args:
3313 paths: Paths to iterate over
3314 root_path: Root path to access from
3315 object_store: Optional store to save new blobs in
3316 Returns: Iterator over path, index_entry
3317 """
3318 for path in paths:
3319 p = _tree_to_fs_path(root_path, path)
3320 try:
3321 entry = index_entry_from_path(p, object_store=object_store)
3322 except (FileNotFoundError, IsADirectoryError):
3323 entry = None
3324 yield path, entry
3327def iter_fresh_objects(
3328 paths: Iterable[bytes],
3329 root_path: bytes,
3330 include_deleted: bool = False,
3331 object_store: ObjectContainer | None = None,
3332) -> Iterator[tuple[bytes, ObjectID | None, int | None]]:
3333 """Iterate over versions of objects on disk referenced by index.
3335 Args:
3336 paths: Paths to check
3337 root_path: Root path to access from
3338 include_deleted: Include deleted entries with sha and
3339 mode set to None
3340 object_store: Optional object store to report new items to
3341 Returns: Iterator over path, sha, mode
3342 """
3343 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
3344 if entry is None:
3345 if include_deleted:
3346 yield path, None, None
3347 else:
3348 yield path, entry.sha, cleanup_mode(entry.mode)
3351def refresh_index(index: Index, root_path: bytes) -> None:
3352 """Refresh the contents of an index.
3354 This is the equivalent to running 'git commit -a'.
3356 Args:
3357 index: Index to update
3358 root_path: Root filesystem path
3359 """
3360 for path, entry in iter_fresh_entries(index, root_path):
3361 if entry:
3362 index[path] = entry
3365class locked_index:
3366 """Lock the index while making modifications.
3368 Works as a context manager.
3369 """
3371 _file: "_GitFile"
3373 def __init__(self, path: bytes | str) -> None:
3374 """Initialize locked_index."""
3375 self._path = path
3377 def __enter__(self) -> Index:
3378 """Enter context manager and lock index."""
3379 f = GitFile(self._path, "wb")
3380 self._file = f
3381 self._index = Index(self._path)
3382 return self._index
3384 def __exit__(
3385 self,
3386 exc_type: type | None,
3387 exc_value: BaseException | None,
3388 traceback: types.TracebackType | None,
3389 ) -> None:
3390 """Exit context manager and unlock index."""
3391 if exc_type is not None:
3392 self._file.abort()
3393 return
3394 try:
3395 f = SHA1Writer(self._file)
3396 write_index_dict(f, self._index._byname)
3397 except BaseException:
3398 self._file.abort()
3399 else:
3400 f.close()