Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 31%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24__all__ = [
25 "DEFAULT_VERSION",
26 "EOIE_EXTENSION",
27 "EXTENDED_FLAG_INTEND_TO_ADD",
28 "EXTENDED_FLAG_SKIP_WORKTREE",
29 "FLAG_EXTENDED",
30 "FLAG_NAMEMASK",
31 "FLAG_STAGEMASK",
32 "FLAG_STAGESHIFT",
33 "FLAG_VALID",
34 "HFS_IGNORABLE_CHARS",
35 "IEOT_EXTENSION",
36 "INVALID_DOTNAMES",
37 "REUC_EXTENSION",
38 "SDIR_EXTENSION",
39 "TREE_EXTENSION",
40 "UNTR_EXTENSION",
41 "Index",
42 "IndexEntry",
43 "IndexExtension",
44 "ResolveUndoExtension",
45 "SerializedIndexEntry",
46 "SparseDirExtension",
47 "Stage",
48 "TreeDict",
49 "TreeExtension",
50 "UnmergedEntries",
51 "UnsupportedIndexFormat",
52 "UntrackedExtension",
53 "blob_from_path_and_mode",
54 "blob_from_path_and_stat",
55 "build_file_from_blob",
56 "build_index_from_tree",
57 "changes_from_tree",
58 "cleanup_mode",
59 "commit_index",
60 "commit_tree",
61 "detect_case_only_renames",
62 "get_path_element_normalizer",
63 "get_unstaged_changes",
64 "index_entry_from_stat",
65 "pathjoin",
66 "pathsplit",
67 "read_cache_entry",
68 "read_cache_time",
69 "read_index",
70 "read_index_dict",
71 "read_index_dict_with_version",
72 "read_index_header",
73 "read_submodule_head",
74 "update_working_tree",
75 "validate_path",
76 "validate_path_element_default",
77 "validate_path_element_hfs",
78 "validate_path_element_ntfs",
79 "write_cache_entry",
80 "write_cache_time",
81 "write_index",
82 "write_index_dict",
83 "write_index_extension",
84]
86import errno
87import os
88import shutil
89import stat
90import struct
91import sys
92import types
93from collections.abc import (
94 Callable,
95 Generator,
96 Iterable,
97 Iterator,
98 Mapping,
99 Sequence,
100 Set,
101)
102from dataclasses import dataclass
103from enum import Enum
104from typing import (
105 IO,
106 TYPE_CHECKING,
107 Any,
108 BinaryIO,
109)
111if TYPE_CHECKING:
112 from .config import Config
113 from .diff_tree import TreeChange
114 from .file import _GitFile
115 from .filters import FilterBlobNormalizer
116 from .object_store import BaseObjectStore
117 from .repo import Repo
119from .file import GitFile
120from .object_store import iter_tree_contents
121from .objects import (
122 S_IFGITLINK,
123 S_ISGITLINK,
124 Blob,
125 ObjectID,
126 Tree,
127 TreeEntry,
128 hex_to_sha,
129 sha_to_hex,
130)
131from .pack import ObjectContainer, SHA1Reader, SHA1Writer
133# Type alias for recursive tree structure used in commit_tree
134TreeDict = dict[bytes, "TreeDict | tuple[int, ObjectID]"]
136# 2-bit stage (during merge)
137FLAG_STAGEMASK = 0x3000
138FLAG_STAGESHIFT = 12
139FLAG_NAMEMASK = 0x0FFF
141# assume-valid
142FLAG_VALID = 0x8000
144# extended flag (must be zero in version 2)
145FLAG_EXTENDED = 0x4000
147# used by sparse checkout
148EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
150# used by "git add -N"
151EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
153DEFAULT_VERSION = 2
155# Index extension signatures
156TREE_EXTENSION = b"TREE"
157REUC_EXTENSION = b"REUC"
158UNTR_EXTENSION = b"UNTR"
159EOIE_EXTENSION = b"EOIE"
160IEOT_EXTENSION = b"IEOT"
161SDIR_EXTENSION = b"sdir" # Sparse directory extension
164def _encode_varint(value: int) -> bytes:
165 """Encode an integer using variable-width encoding.
167 Same format as used for OFS_DELTA pack entries and index v4 path compression.
168 Uses 7 bits per byte, with the high bit indicating continuation.
170 Args:
171 value: Integer to encode
172 Returns:
173 Encoded bytes
174 """
175 if value == 0:
176 return b"\x00"
178 result = []
179 while value > 0:
180 byte = value & 0x7F # Take lower 7 bits
181 value >>= 7
182 if value > 0:
183 byte |= 0x80 # Set continuation bit
184 result.append(byte)
186 return bytes(result)
189def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
190 """Decode a variable-width encoded integer.
192 Args:
193 data: Bytes to decode from
194 offset: Starting offset in data
195 Returns:
196 tuple of (decoded_value, new_offset)
197 """
198 value = 0
199 shift = 0
200 pos = offset
202 while pos < len(data):
203 byte = data[pos]
204 pos += 1
205 value |= (byte & 0x7F) << shift
206 shift += 7
207 if not (byte & 0x80): # No continuation bit
208 break
210 return value, pos
213def _compress_path(path: bytes, previous_path: bytes) -> bytes:
214 """Compress a path relative to the previous path for index version 4.
216 Args:
217 path: Path to compress
218 previous_path: Previous path for comparison
219 Returns:
220 Compressed path data (varint prefix_len + suffix)
221 """
222 # Find the common prefix length
223 common_len = 0
224 min_len = min(len(path), len(previous_path))
226 for i in range(min_len):
227 if path[i] == previous_path[i]:
228 common_len += 1
229 else:
230 break
232 # The number of bytes to remove from the end of previous_path
233 # to get the common prefix
234 remove_len = len(previous_path) - common_len
236 # The suffix to append
237 suffix = path[common_len:]
239 # Encode: varint(remove_len) + suffix + NUL
240 return _encode_varint(remove_len) + suffix + b"\x00"
243def _decompress_path(
244 data: bytes, offset: int, previous_path: bytes
245) -> tuple[bytes, int]:
246 """Decompress a path from index version 4 compressed format.
248 Args:
249 data: Raw data containing compressed path
250 offset: Starting offset in data
251 previous_path: Previous path for decompression
252 Returns:
253 tuple of (decompressed_path, new_offset)
254 """
255 # Decode the number of bytes to remove from previous path
256 remove_len, new_offset = _decode_varint(data, offset)
258 # Find the NUL terminator for the suffix
259 suffix_start = new_offset
260 suffix_end = suffix_start
261 while suffix_end < len(data) and data[suffix_end] != 0:
262 suffix_end += 1
264 if suffix_end >= len(data):
265 raise ValueError("Unterminated path suffix in compressed entry")
267 suffix = data[suffix_start:suffix_end]
268 new_offset = suffix_end + 1 # Skip the NUL terminator
270 # Reconstruct the path
271 if remove_len > len(previous_path):
272 raise ValueError(
273 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
274 )
276 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
277 path = prefix + suffix
279 return path, new_offset
282def _decompress_path_from_stream(
283 f: BinaryIO, previous_path: bytes
284) -> tuple[bytes, int]:
285 """Decompress a path from index version 4 compressed format, reading from stream.
287 Args:
288 f: File-like object to read from
289 previous_path: Previous path for decompression
290 Returns:
291 tuple of (decompressed_path, bytes_consumed)
292 """
293 # Decode the varint for remove_len by reading byte by byte
294 remove_len = 0
295 shift = 0
296 bytes_consumed = 0
298 while True:
299 byte_data = f.read(1)
300 if not byte_data:
301 raise ValueError("Unexpected end of file while reading varint")
302 byte = byte_data[0]
303 bytes_consumed += 1
304 remove_len |= (byte & 0x7F) << shift
305 shift += 7
306 if not (byte & 0x80): # No continuation bit
307 break
309 # Read the suffix until NUL terminator
310 suffix = b""
311 while True:
312 byte_data = f.read(1)
313 if not byte_data:
314 raise ValueError("Unexpected end of file while reading path suffix")
315 byte = byte_data[0]
316 bytes_consumed += 1
317 if byte == 0: # NUL terminator
318 break
319 suffix += bytes([byte])
321 # Reconstruct the path
322 if remove_len > len(previous_path):
323 raise ValueError(
324 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
325 )
327 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
328 path = prefix + suffix
330 return path, bytes_consumed
333class Stage(Enum):
334 """Represents the stage of an index entry during merge conflicts."""
336 NORMAL = 0
337 MERGE_CONFLICT_ANCESTOR = 1
338 MERGE_CONFLICT_THIS = 2
339 MERGE_CONFLICT_OTHER = 3
342@dataclass
343class SerializedIndexEntry:
344 """Represents a serialized index entry as stored in the index file.
346 This dataclass holds the raw data for an index entry before it's
347 parsed into the more user-friendly IndexEntry format.
348 """
350 name: bytes
351 ctime: int | float | tuple[int, int]
352 mtime: int | float | tuple[int, int]
353 dev: int
354 ino: int
355 mode: int
356 uid: int
357 gid: int
358 size: int
359 sha: ObjectID
360 flags: int
361 extended_flags: int
363 def stage(self) -> Stage:
364 """Extract the stage from the flags field.
366 Returns:
367 Stage enum value indicating merge conflict state
368 """
369 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
371 def is_sparse_dir(self) -> bool:
372 """Check if this entry represents a sparse directory.
374 A sparse directory entry is a collapsed representation of an entire
375 directory tree in a sparse index. It has:
376 - Directory mode (0o040000)
377 - SKIP_WORKTREE flag set
378 - Path ending with '/'
379 - SHA pointing to a tree object
381 Returns:
382 True if entry is a sparse directory entry
383 """
384 return (
385 stat.S_ISDIR(self.mode)
386 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
387 and self.name.endswith(b"/")
388 )
391@dataclass
392class IndexExtension:
393 """Base class for index extensions."""
395 signature: bytes
396 data: bytes
398 @classmethod
399 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
400 """Create an extension from raw data.
402 Args:
403 signature: 4-byte extension signature
404 data: Extension data
405 Returns:
406 Parsed extension object
407 """
408 if signature == TREE_EXTENSION:
409 return TreeExtension.from_bytes(data)
410 elif signature == REUC_EXTENSION:
411 return ResolveUndoExtension.from_bytes(data)
412 elif signature == UNTR_EXTENSION:
413 return UntrackedExtension.from_bytes(data)
414 elif signature == SDIR_EXTENSION:
415 return SparseDirExtension.from_bytes(data)
416 else:
417 # Unknown extension - just store raw data
418 return cls(signature, data)
420 def to_bytes(self) -> bytes:
421 """Serialize extension to bytes."""
422 return self.data
425class TreeExtension(IndexExtension):
426 """Tree cache extension."""
428 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
429 """Initialize TreeExtension.
431 Args:
432 entries: List of tree cache entries (path, sha, flags)
433 """
434 self.entries = entries
435 super().__init__(TREE_EXTENSION, b"")
437 @classmethod
438 def from_bytes(cls, data: bytes) -> "TreeExtension":
439 """Parse TreeExtension from bytes.
441 Args:
442 data: Raw bytes to parse
444 Returns:
445 TreeExtension instance
446 """
447 # TODO: Implement tree cache parsing
448 return cls([])
450 def to_bytes(self) -> bytes:
451 """Serialize TreeExtension to bytes.
453 Returns:
454 Serialized extension data
455 """
456 # TODO: Implement tree cache serialization
457 return b""
460class ResolveUndoExtension(IndexExtension):
461 """Resolve undo extension for recording merge conflicts."""
463 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
464 """Initialize ResolveUndoExtension.
466 Args:
467 entries: List of (path, stages) where stages is a list of (stage, sha) tuples
468 """
469 self.entries = entries
470 super().__init__(REUC_EXTENSION, b"")
472 @classmethod
473 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
474 """Parse ResolveUndoExtension from bytes.
476 Args:
477 data: Raw bytes to parse
479 Returns:
480 ResolveUndoExtension instance
481 """
482 # TODO: Implement resolve undo parsing
483 return cls([])
485 def to_bytes(self) -> bytes:
486 """Serialize ResolveUndoExtension to bytes.
488 Returns:
489 Serialized extension data
490 """
491 # TODO: Implement resolve undo serialization
492 return b""
495class UntrackedExtension(IndexExtension):
496 """Untracked cache extension."""
498 def __init__(self, data: bytes) -> None:
499 """Initialize UntrackedExtension.
501 Args:
502 data: Raw untracked cache data
503 """
504 super().__init__(UNTR_EXTENSION, data)
506 @classmethod
507 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
508 """Parse UntrackedExtension from bytes.
510 Args:
511 data: Raw bytes to parse
513 Returns:
514 UntrackedExtension instance
515 """
516 return cls(data)
519class SparseDirExtension(IndexExtension):
520 """Sparse directory extension.
522 This extension indicates that the index contains sparse directory entries.
523 Tools that don't understand sparse index should avoid interacting with
524 the index when this extension is present.
526 The extension data is empty - its presence is the signal.
527 """
529 def __init__(self) -> None:
530 """Initialize SparseDirExtension."""
531 super().__init__(SDIR_EXTENSION, b"")
533 @classmethod
534 def from_bytes(cls, data: bytes) -> "SparseDirExtension":
535 """Parse SparseDirExtension from bytes.
537 Args:
538 data: Raw bytes to parse (should be empty)
540 Returns:
541 SparseDirExtension instance
542 """
543 return cls()
545 def to_bytes(self) -> bytes:
546 """Serialize SparseDirExtension to bytes.
548 Returns:
549 Empty bytes (extension presence is the signal)
550 """
551 return b""
554@dataclass
555class IndexEntry:
556 """Represents an entry in the Git index.
558 This is a higher-level representation of an index entry that includes
559 parsed data and convenience methods.
560 """
562 ctime: int | float | tuple[int, int]
563 mtime: int | float | tuple[int, int]
564 dev: int
565 ino: int
566 mode: int
567 uid: int
568 gid: int
569 size: int
570 sha: ObjectID
571 flags: int = 0
572 extended_flags: int = 0
574 @classmethod
575 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
576 """Create an IndexEntry from a SerializedIndexEntry.
578 Args:
579 serialized: SerializedIndexEntry to convert
581 Returns:
582 New IndexEntry instance
583 """
584 return cls(
585 ctime=serialized.ctime,
586 mtime=serialized.mtime,
587 dev=serialized.dev,
588 ino=serialized.ino,
589 mode=serialized.mode,
590 uid=serialized.uid,
591 gid=serialized.gid,
592 size=serialized.size,
593 sha=serialized.sha,
594 flags=serialized.flags,
595 extended_flags=serialized.extended_flags,
596 )
598 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
599 """Serialize this entry with a given name and stage.
601 Args:
602 name: Path name for the entry
603 stage: Merge conflict stage
605 Returns:
606 SerializedIndexEntry ready for writing to disk
607 """
608 # Clear out any existing stage bits, then set them from the Stage.
609 new_flags = self.flags & ~FLAG_STAGEMASK
610 new_flags |= stage.value << FLAG_STAGESHIFT
611 return SerializedIndexEntry(
612 name=name,
613 ctime=self.ctime,
614 mtime=self.mtime,
615 dev=self.dev,
616 ino=self.ino,
617 mode=self.mode,
618 uid=self.uid,
619 gid=self.gid,
620 size=self.size,
621 sha=self.sha,
622 flags=new_flags,
623 extended_flags=self.extended_flags,
624 )
626 def stage(self) -> Stage:
627 """Get the merge conflict stage of this entry.
629 Returns:
630 Stage enum value
631 """
632 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
634 @property
635 def skip_worktree(self) -> bool:
636 """Return True if the skip-worktree bit is set in extended_flags."""
637 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
639 def set_skip_worktree(self, skip: bool = True) -> None:
640 """Helper method to set or clear the skip-worktree bit in extended_flags.
642 Also sets FLAG_EXTENDED in self.flags if needed.
643 """
644 if skip:
645 # Turn on the skip-worktree bit
646 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
647 # Also ensure the main 'extended' bit is set in flags
648 self.flags |= FLAG_EXTENDED
649 else:
650 # Turn off the skip-worktree bit
651 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
652 # Optionally unset the main extended bit if no extended flags remain
653 if self.extended_flags == 0:
654 self.flags &= ~FLAG_EXTENDED
656 def is_sparse_dir(self, name: bytes) -> bool:
657 """Check if this entry represents a sparse directory.
659 A sparse directory entry is a collapsed representation of an entire
660 directory tree in a sparse index. It has:
661 - Directory mode (0o040000)
662 - SKIP_WORKTREE flag set
663 - Path ending with '/'
664 - SHA pointing to a tree object
666 Args:
667 name: The path name for this entry (IndexEntry doesn't store name)
669 Returns:
670 True if entry is a sparse directory entry
671 """
672 return (
673 stat.S_ISDIR(self.mode)
674 and bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
675 and name.endswith(b"/")
676 )
679class ConflictedIndexEntry:
680 """Index entry that represents a conflict."""
682 ancestor: IndexEntry | None
683 this: IndexEntry | None
684 other: IndexEntry | None
686 def __init__(
687 self,
688 ancestor: IndexEntry | None = None,
689 this: IndexEntry | None = None,
690 other: IndexEntry | None = None,
691 ) -> None:
692 """Initialize ConflictedIndexEntry.
694 Args:
695 ancestor: The common ancestor entry
696 this: The current branch entry
697 other: The other branch entry
698 """
699 self.ancestor = ancestor
700 self.this = this
701 self.other = other
704class UnmergedEntries(Exception):
705 """Unmerged entries exist in the index."""
708def pathsplit(path: bytes) -> tuple[bytes, bytes]:
709 """Split a /-delimited path into a directory part and a basename.
711 Args:
712 path: The path to split.
714 Returns:
715 Tuple with directory name and basename
716 """
717 try:
718 (dirname, basename) = path.rsplit(b"/", 1)
719 except ValueError:
720 return (b"", path)
721 else:
722 return (dirname, basename)
725def pathjoin(*args: bytes) -> bytes:
726 """Join a /-delimited path."""
727 return b"/".join([p for p in args if p])
730def read_cache_time(f: BinaryIO) -> tuple[int, int]:
731 """Read a cache time.
733 Args:
734 f: File-like object to read from
735 Returns:
736 Tuple with seconds and nanoseconds
737 """
738 return struct.unpack(">LL", f.read(8))
741def write_cache_time(f: IO[bytes], t: int | float | tuple[int, int]) -> None:
742 """Write a cache time.
744 Args:
745 f: File-like object to write to
746 t: Time to write (as int, float or tuple with secs and nsecs)
747 """
748 if isinstance(t, int):
749 t = (t, 0)
750 elif isinstance(t, float):
751 (secs, nsecs) = divmod(t, 1.0)
752 t = (int(secs), int(nsecs * 1000000000))
753 elif not isinstance(t, tuple):
754 raise TypeError(t)
755 f.write(struct.pack(">LL", *t))
758def read_cache_entry(
759 f: BinaryIO, version: int, previous_path: bytes = b""
760) -> SerializedIndexEntry:
761 """Read an entry from a cache file.
763 Args:
764 f: File-like object to read from
765 version: Index version
766 previous_path: Previous entry's path (for version 4 compression)
767 """
768 beginoffset = f.tell()
769 ctime = read_cache_time(f)
770 mtime = read_cache_time(f)
771 (
772 dev,
773 ino,
774 mode,
775 uid,
776 gid,
777 size,
778 sha,
779 flags,
780 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
781 if flags & FLAG_EXTENDED:
782 if version < 3:
783 raise AssertionError("extended flag set in index with version < 3")
784 (extended_flags,) = struct.unpack(">H", f.read(2))
785 else:
786 extended_flags = 0
788 if version >= 4:
789 # Version 4: paths are always compressed (name_len should be 0)
790 name, _consumed = _decompress_path_from_stream(f, previous_path)
791 else:
792 # Versions < 4: regular name reading
793 name = f.read(flags & FLAG_NAMEMASK)
795 # Padding:
796 if version < 4:
797 real_size = (f.tell() - beginoffset + 8) & ~7
798 f.read((beginoffset + real_size) - f.tell())
800 return SerializedIndexEntry(
801 name,
802 ctime,
803 mtime,
804 dev,
805 ino,
806 mode,
807 uid,
808 gid,
809 size,
810 sha_to_hex(sha),
811 flags & ~FLAG_NAMEMASK,
812 extended_flags,
813 )
816def write_cache_entry(
817 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
818) -> None:
819 """Write an index entry to a file.
821 Args:
822 f: File object
823 entry: IndexEntry to write
824 version: Index format version
825 previous_path: Previous entry's path (for version 4 compression)
826 """
827 beginoffset = f.tell()
828 write_cache_time(f, entry.ctime)
829 write_cache_time(f, entry.mtime)
831 if version >= 4:
832 # Version 4: use compression but set name_len to actual filename length
833 # This matches how C Git implements index v4 flags
834 compressed_path = _compress_path(entry.name, previous_path)
835 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
836 else:
837 # Versions < 4: include actual name length
838 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
840 if entry.extended_flags:
841 flags |= FLAG_EXTENDED
842 if flags & FLAG_EXTENDED and version is not None and version < 3:
843 raise AssertionError("unable to use extended flags in version < 3")
845 f.write(
846 struct.pack(
847 b">LLLLLL20sH",
848 entry.dev & 0xFFFFFFFF,
849 entry.ino & 0xFFFFFFFF,
850 entry.mode,
851 entry.uid,
852 entry.gid,
853 entry.size,
854 hex_to_sha(entry.sha),
855 flags,
856 )
857 )
858 if flags & FLAG_EXTENDED:
859 f.write(struct.pack(b">H", entry.extended_flags))
861 if version >= 4:
862 # Version 4: always write compressed path
863 f.write(compressed_path)
864 else:
865 # Versions < 4: write regular path and padding
866 f.write(entry.name)
867 real_size = (f.tell() - beginoffset + 8) & ~7
868 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
871class UnsupportedIndexFormat(Exception):
872 """An unsupported index format was encountered."""
874 def __init__(self, version: int) -> None:
875 """Initialize UnsupportedIndexFormat exception.
877 Args:
878 version: The unsupported index format version
879 """
880 self.index_format_version = version
883def read_index_header(f: BinaryIO) -> tuple[int, int]:
884 """Read an index header from a file.
886 Returns:
887 tuple of (version, num_entries)
888 """
889 header = f.read(4)
890 if header != b"DIRC":
891 raise AssertionError(f"Invalid index file header: {header!r}")
892 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
893 if version not in (1, 2, 3, 4):
894 raise UnsupportedIndexFormat(version)
895 return version, num_entries
898def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None:
899 """Write an index extension.
901 Args:
902 f: File-like object to write to
903 extension: Extension to write
904 """
905 data = extension.to_bytes()
906 f.write(extension.signature)
907 f.write(struct.pack(">I", len(data)))
908 f.write(data)
911def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
912 """Read an index file, yielding the individual entries."""
913 version, num_entries = read_index_header(f)
914 previous_path = b""
915 for i in range(num_entries):
916 entry = read_cache_entry(f, version, previous_path)
917 previous_path = entry.name
918 yield entry
921def read_index_dict_with_version(
922 f: BinaryIO,
923) -> tuple[dict[bytes, IndexEntry | ConflictedIndexEntry], int, list[IndexExtension]]:
924 """Read an index file and return it as a dictionary along with the version.
926 Returns:
927 tuple of (entries_dict, version, extensions)
928 """
929 version, num_entries = read_index_header(f)
931 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
932 previous_path = b""
933 for i in range(num_entries):
934 entry = read_cache_entry(f, version, previous_path)
935 previous_path = entry.name
936 stage = entry.stage()
937 if stage == Stage.NORMAL:
938 ret[entry.name] = IndexEntry.from_serialized(entry)
939 else:
940 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
941 if isinstance(existing, IndexEntry):
942 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
943 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
944 existing.ancestor = IndexEntry.from_serialized(entry)
945 elif stage == Stage.MERGE_CONFLICT_THIS:
946 existing.this = IndexEntry.from_serialized(entry)
947 elif stage == Stage.MERGE_CONFLICT_OTHER:
948 existing.other = IndexEntry.from_serialized(entry)
950 # Read extensions
951 extensions = []
952 while True:
953 # Check if we're at the end (20 bytes before EOF for SHA checksum)
954 current_pos = f.tell()
955 f.seek(0, 2) # EOF
956 eof_pos = f.tell()
957 f.seek(current_pos)
959 if current_pos >= eof_pos - 20:
960 break
962 # Try to read extension signature
963 signature = f.read(4)
964 if len(signature) < 4:
965 break
967 # Check if it's a valid extension signature (4 uppercase letters)
968 if not all(65 <= b <= 90 for b in signature):
969 # Not an extension, seek back
970 f.seek(-4, 1)
971 break
973 # Read extension size
974 size_data = f.read(4)
975 if len(size_data) < 4:
976 break
977 size = struct.unpack(">I", size_data)[0]
979 # Read extension data
980 data = f.read(size)
981 if len(data) < size:
982 break
984 extension = IndexExtension.from_raw(signature, data)
985 extensions.append(extension)
987 return ret, version, extensions
990def read_index_dict(
991 f: BinaryIO,
992) -> dict[bytes, IndexEntry | ConflictedIndexEntry]:
993 """Read an index file and return it as a dictionary.
995 Dict Key is tuple of path and stage number, as
996 path alone is not unique
997 Args:
998 f: File object to read fromls.
999 """
1000 ret: dict[bytes, IndexEntry | ConflictedIndexEntry] = {}
1001 for entry in read_index(f):
1002 stage = entry.stage()
1003 if stage == Stage.NORMAL:
1004 ret[entry.name] = IndexEntry.from_serialized(entry)
1005 else:
1006 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
1007 if isinstance(existing, IndexEntry):
1008 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
1009 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
1010 existing.ancestor = IndexEntry.from_serialized(entry)
1011 elif stage == Stage.MERGE_CONFLICT_THIS:
1012 existing.this = IndexEntry.from_serialized(entry)
1013 elif stage == Stage.MERGE_CONFLICT_OTHER:
1014 existing.other = IndexEntry.from_serialized(entry)
1015 return ret
1018def write_index(
1019 f: IO[bytes],
1020 entries: Sequence[SerializedIndexEntry],
1021 version: int | None = None,
1022 extensions: Sequence[IndexExtension] | None = None,
1023) -> None:
1024 """Write an index file.
1026 Args:
1027 f: File-like object to write to
1028 version: Version number to write
1029 entries: Iterable over the entries to write
1030 extensions: Optional list of extensions to write
1031 """
1032 if version is None:
1033 version = DEFAULT_VERSION
1034 # STEP 1: check if any extended_flags are set
1035 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
1036 if uses_extended_flags and version < 3:
1037 # Force or bump the version to 3
1038 version = 3
1039 # The rest is unchanged, but you might insert a final check:
1040 if version < 3:
1041 # Double-check no extended flags appear
1042 for e in entries:
1043 if e.extended_flags != 0:
1044 raise AssertionError("Attempt to use extended flags in index < v3")
1045 # Proceed with the existing code to write the header and entries.
1046 f.write(b"DIRC")
1047 f.write(struct.pack(b">LL", version, len(entries)))
1048 previous_path = b""
1049 for entry in entries:
1050 write_cache_entry(f, entry, version=version, previous_path=previous_path)
1051 previous_path = entry.name
1053 # Write extensions
1054 if extensions:
1055 for extension in extensions:
1056 write_index_extension(f, extension)
1059def write_index_dict(
1060 f: IO[bytes],
1061 entries: Mapping[bytes, IndexEntry | ConflictedIndexEntry],
1062 version: int | None = None,
1063 extensions: Sequence[IndexExtension] | None = None,
1064) -> None:
1065 """Write an index file based on the contents of a dictionary.
1067 being careful to sort by path and then by stage.
1068 """
1069 entries_list = []
1070 for key in sorted(entries):
1071 value = entries[key]
1072 if isinstance(value, ConflictedIndexEntry):
1073 if value.ancestor is not None:
1074 entries_list.append(
1075 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
1076 )
1077 if value.this is not None:
1078 entries_list.append(
1079 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
1080 )
1081 if value.other is not None:
1082 entries_list.append(
1083 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
1084 )
1085 else:
1086 entries_list.append(value.serialize(key, Stage.NORMAL))
1088 write_index(f, entries_list, version=version, extensions=extensions)
1091def cleanup_mode(mode: int) -> int:
1092 """Cleanup a mode value.
1094 This will return a mode that can be stored in a tree object.
1096 Args:
1097 mode: Mode to clean up.
1099 Returns:
1100 mode
1101 """
1102 if stat.S_ISLNK(mode):
1103 return stat.S_IFLNK
1104 elif stat.S_ISDIR(mode):
1105 return stat.S_IFDIR
1106 elif S_ISGITLINK(mode):
1107 return S_IFGITLINK
1108 ret = stat.S_IFREG | 0o644
1109 if mode & 0o100:
1110 ret |= 0o111
1111 return ret
1114class Index:
1115 """A Git Index file."""
1117 _byname: dict[bytes, IndexEntry | ConflictedIndexEntry]
1119 def __init__(
1120 self,
1121 filename: bytes | str | os.PathLike[str],
1122 read: bool = True,
1123 skip_hash: bool = False,
1124 version: int | None = None,
1125 *,
1126 file_mode: int | None = None,
1127 ) -> None:
1128 """Create an index object associated with the given filename.
1130 Args:
1131 filename: Path to the index file
1132 read: Whether to initialize the index from the given file, should it exist.
1133 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
1134 version: Index format version to use (None = auto-detect from file or use default)
1135 file_mode: Optional file permission mask for shared repository
1136 """
1137 self._filename = os.fspath(filename)
1138 # TODO(jelmer): Store the version returned by read_index
1139 self._version = version
1140 self._skip_hash = skip_hash
1141 self._file_mode = file_mode
1142 self._extensions: list[IndexExtension] = []
1143 self.clear()
1144 if read:
1145 self.read()
1147 @property
1148 def path(self) -> bytes | str:
1149 """Get the path to the index file.
1151 Returns:
1152 Path to the index file
1153 """
1154 return self._filename
1156 def __repr__(self) -> str:
1157 """Return string representation of Index."""
1158 return f"{self.__class__.__name__}({self._filename!r})"
1160 def write(self) -> None:
1161 """Write current contents of index to disk."""
1162 mask = self._file_mode if self._file_mode is not None else 0o644
1163 f = GitFile(self._filename, "wb", mask=mask)
1164 try:
1165 # Filter out extensions with no meaningful data
1166 meaningful_extensions = []
1167 for ext in self._extensions:
1168 # Skip extensions that have empty data
1169 ext_data = ext.to_bytes()
1170 if ext_data:
1171 meaningful_extensions.append(ext)
1173 if self._skip_hash:
1174 # When skipHash is enabled, write the index without computing SHA1
1175 write_index_dict(
1176 f,
1177 self._byname,
1178 version=self._version,
1179 extensions=meaningful_extensions,
1180 )
1181 # Write 20 zero bytes instead of SHA1
1182 f.write(b"\x00" * 20)
1183 f.close()
1184 else:
1185 sha1_writer = SHA1Writer(f)
1186 write_index_dict(
1187 sha1_writer,
1188 self._byname,
1189 version=self._version,
1190 extensions=meaningful_extensions,
1191 )
1192 sha1_writer.close()
1193 except:
1194 f.close()
1195 raise
1197 def read(self) -> None:
1198 """Read current contents of index from disk."""
1199 if not os.path.exists(self._filename):
1200 return
1201 f = GitFile(self._filename, "rb")
1202 try:
1203 sha1_reader = SHA1Reader(f)
1204 entries, version, extensions = read_index_dict_with_version(sha1_reader)
1205 self._version = version
1206 self._extensions = extensions
1207 self.update(entries)
1208 # Extensions have already been read by read_index_dict_with_version
1209 sha1_reader.check_sha(allow_empty=True)
1210 finally:
1211 f.close()
1213 def __len__(self) -> int:
1214 """Number of entries in this index file."""
1215 return len(self._byname)
1217 def __getitem__(self, key: bytes) -> IndexEntry | ConflictedIndexEntry:
1218 """Retrieve entry by relative path and stage.
1220 Returns: Either a IndexEntry or a ConflictedIndexEntry
1221 Raises KeyError: if the entry does not exist
1222 """
1223 return self._byname[key]
1225 def __iter__(self) -> Iterator[bytes]:
1226 """Iterate over the paths and stages in this index."""
1227 return iter(self._byname)
1229 def __contains__(self, key: bytes) -> bool:
1230 """Check if a path exists in the index."""
1231 return key in self._byname
1233 def get_sha1(self, path: bytes) -> ObjectID:
1234 """Return the (git object) SHA1 for the object at a path."""
1235 value = self[path]
1236 if isinstance(value, ConflictedIndexEntry):
1237 raise UnmergedEntries
1238 return value.sha
1240 def get_mode(self, path: bytes) -> int:
1241 """Return the POSIX file mode for the object at a path."""
1242 value = self[path]
1243 if isinstance(value, ConflictedIndexEntry):
1244 raise UnmergedEntries
1245 return value.mode
1247 def iterobjects(self) -> Iterable[tuple[bytes, ObjectID, int]]:
1248 """Iterate over path, sha, mode tuples for use with commit_tree."""
1249 for path in self:
1250 entry = self[path]
1251 if isinstance(entry, ConflictedIndexEntry):
1252 raise UnmergedEntries
1253 yield path, entry.sha, cleanup_mode(entry.mode)
1255 def has_conflicts(self) -> bool:
1256 """Check if the index contains any conflicted entries.
1258 Returns:
1259 True if any entries are conflicted, False otherwise
1260 """
1261 for value in self._byname.values():
1262 if isinstance(value, ConflictedIndexEntry):
1263 return True
1264 return False
1266 def clear(self) -> None:
1267 """Remove all contents from this index."""
1268 self._byname = {}
1270 def __setitem__(
1271 self, name: bytes, value: IndexEntry | ConflictedIndexEntry
1272 ) -> None:
1273 """Set an entry in the index."""
1274 assert isinstance(name, bytes)
1275 self._byname[name] = value
1277 def __delitem__(self, name: bytes) -> None:
1278 """Delete an entry from the index."""
1279 del self._byname[name]
1281 def iteritems(
1282 self,
1283 ) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
1284 """Iterate over (path, entry) pairs in the index.
1286 Returns:
1287 Iterator of (path, entry) tuples
1288 """
1289 return iter(self._byname.items())
1291 def items(self) -> Iterator[tuple[bytes, IndexEntry | ConflictedIndexEntry]]:
1292 """Get an iterator over (path, entry) pairs.
1294 Returns:
1295 Iterator of (path, entry) tuples
1296 """
1297 return iter(self._byname.items())
1299 def update(self, entries: dict[bytes, IndexEntry | ConflictedIndexEntry]) -> None:
1300 """Update the index with multiple entries.
1302 Args:
1303 entries: Dictionary mapping paths to index entries
1304 """
1305 for key, value in entries.items():
1306 self[key] = value
1308 def paths(self) -> Generator[bytes, None, None]:
1309 """Generate all paths in the index.
1311 Yields:
1312 Path names as bytes
1313 """
1314 yield from self._byname.keys()
1316 def changes_from_tree(
1317 self,
1318 object_store: ObjectContainer,
1319 tree: ObjectID,
1320 want_unchanged: bool = False,
1321 ) -> Generator[
1322 tuple[
1323 tuple[bytes | None, bytes | None],
1324 tuple[int | None, int | None],
1325 tuple[bytes | None, bytes | None],
1326 ],
1327 None,
1328 None,
1329 ]:
1330 """Find the differences between the contents of this index and a tree.
1332 Args:
1333 object_store: Object store to use for retrieving tree contents
1334 tree: SHA1 of the root tree
1335 want_unchanged: Whether unchanged files should be reported
1336 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1337 newmode), (oldsha, newsha)
1338 """
1340 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1341 entry = self[path]
1342 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1343 return entry.sha, cleanup_mode(entry.mode)
1344 else:
1345 # Handle ConflictedIndexEntry case
1346 return b"", 0
1348 yield from changes_from_tree(
1349 self.paths(),
1350 lookup_entry,
1351 object_store,
1352 tree,
1353 want_unchanged=want_unchanged,
1354 )
1356 def commit(self, object_store: ObjectContainer) -> ObjectID:
1357 """Create a new tree from an index.
1359 Args:
1360 object_store: Object store to save the tree in
1361 Returns:
1362 Root tree SHA
1363 """
1364 return commit_tree(object_store, self.iterobjects())
1366 def is_sparse(self) -> bool:
1367 """Check if this index contains sparse directory entries.
1369 Returns:
1370 True if any sparse directory extension is present
1371 """
1372 return any(isinstance(ext, SparseDirExtension) for ext in self._extensions)
1374 def ensure_full_index(self, object_store: "BaseObjectStore") -> None:
1375 """Expand all sparse directory entries into full file entries.
1377 This converts a sparse index into a full index by recursively
1378 expanding any sparse directory entries into their constituent files.
1380 Args:
1381 object_store: Object store to read tree objects from
1383 Raises:
1384 KeyError: If a tree object referenced by a sparse dir entry doesn't exist
1385 """
1386 if not self.is_sparse():
1387 return
1389 # Find all sparse directory entries
1390 sparse_dirs = []
1391 for path, entry in list(self._byname.items()):
1392 if isinstance(entry, IndexEntry) and entry.is_sparse_dir(path):
1393 sparse_dirs.append((path, entry))
1395 # Expand each sparse directory
1396 for path, entry in sparse_dirs:
1397 # Remove the sparse directory entry
1398 del self._byname[path]
1400 # Get the tree object
1401 tree = object_store[entry.sha]
1402 if not isinstance(tree, Tree):
1403 raise ValueError(f"Sparse directory {path!r} points to non-tree object")
1405 # Recursively add all entries from the tree
1406 self._expand_tree(path.rstrip(b"/"), tree, object_store, entry)
1408 # Remove the sparse directory extension
1409 self._extensions = [
1410 ext for ext in self._extensions if not isinstance(ext, SparseDirExtension)
1411 ]
1413 def _expand_tree(
1414 self,
1415 prefix: bytes,
1416 tree: Tree,
1417 object_store: "BaseObjectStore",
1418 template_entry: IndexEntry,
1419 ) -> None:
1420 """Recursively expand a tree into index entries.
1422 Args:
1423 prefix: Path prefix for entries (without trailing slash)
1424 tree: Tree object to expand
1425 object_store: Object store to read nested trees from
1426 template_entry: Template entry to copy metadata from
1427 """
1428 for name, mode, sha in tree.items():
1429 if prefix:
1430 full_path = prefix + b"/" + name
1431 else:
1432 full_path = name
1434 if stat.S_ISDIR(mode):
1435 # Recursively expand subdirectories
1436 subtree = object_store[sha]
1437 if not isinstance(subtree, Tree):
1438 raise ValueError(
1439 f"Directory entry {full_path!r} points to non-tree object"
1440 )
1441 self._expand_tree(full_path, subtree, object_store, template_entry)
1442 else:
1443 # Create an index entry for this file
1444 # Use the template entry for metadata but with the file's sha and mode
1445 new_entry = IndexEntry(
1446 ctime=template_entry.ctime,
1447 mtime=template_entry.mtime,
1448 dev=template_entry.dev,
1449 ino=template_entry.ino,
1450 mode=mode,
1451 uid=template_entry.uid,
1452 gid=template_entry.gid,
1453 size=0, # Size is unknown from tree
1454 sha=sha,
1455 flags=0,
1456 extended_flags=0, # Don't copy skip-worktree flag
1457 )
1458 self._byname[full_path] = new_entry
1460 def convert_to_sparse(
1461 self,
1462 object_store: "BaseObjectStore",
1463 tree_sha: ObjectID,
1464 sparse_dirs: Set[bytes],
1465 ) -> None:
1466 """Convert full index entries to sparse directory entries.
1468 This collapses directories that are entirely outside the sparse
1469 checkout cone into single sparse directory entries.
1471 Args:
1472 object_store: Object store to read tree objects
1473 tree_sha: SHA of the tree (usually HEAD) to base sparse dirs on
1474 sparse_dirs: Set of directory paths (with trailing /) to collapse
1476 Raises:
1477 KeyError: If tree_sha or a subdirectory doesn't exist
1478 """
1479 if not sparse_dirs:
1480 return
1482 # Get the base tree
1483 tree = object_store[tree_sha]
1484 if not isinstance(tree, Tree):
1485 raise ValueError(f"tree_sha {tree_sha!r} is not a tree object")
1487 # For each sparse directory, find its tree SHA and create sparse entry
1488 for dir_path in sparse_dirs:
1489 dir_path_stripped = dir_path.rstrip(b"/")
1491 # Find the tree SHA for this directory
1492 subtree_sha = self._find_subtree_sha(tree, dir_path_stripped, object_store)
1493 if subtree_sha is None:
1494 # Directory doesn't exist in tree, skip it
1495 continue
1497 # Remove all entries under this directory
1498 entries_to_remove = [
1499 path
1500 for path in self._byname
1501 if path.startswith(dir_path) or path == dir_path_stripped
1502 ]
1503 for path in entries_to_remove:
1504 del self._byname[path]
1506 # Create a sparse directory entry
1507 # Use minimal metadata since it's not a real file
1508 from dulwich.objects import ObjectID
1510 sparse_entry = IndexEntry(
1511 ctime=0,
1512 mtime=0,
1513 dev=0,
1514 ino=0,
1515 mode=stat.S_IFDIR,
1516 uid=0,
1517 gid=0,
1518 size=0,
1519 sha=ObjectID(subtree_sha),
1520 flags=0,
1521 extended_flags=EXTENDED_FLAG_SKIP_WORKTREE,
1522 )
1523 self._byname[dir_path] = sparse_entry
1525 # Add sparse directory extension if not present
1526 if not self.is_sparse():
1527 self._extensions.append(SparseDirExtension())
1529 def _find_subtree_sha(
1530 self,
1531 tree: Tree,
1532 path: bytes,
1533 object_store: "BaseObjectStore",
1534 ) -> bytes | None:
1535 """Find the SHA of a subtree at a given path.
1537 Args:
1538 tree: Root tree object to search in
1539 path: Path to the subdirectory (no trailing slash)
1540 object_store: Object store to read nested trees from
1542 Returns:
1543 SHA of the subtree, or None if path doesn't exist
1544 """
1545 if not path:
1546 return tree.id
1548 parts = path.split(b"/")
1549 current_tree = tree
1551 for part in parts:
1552 # Look for this part in the current tree
1553 try:
1554 mode, sha = current_tree[part]
1555 except KeyError:
1556 return None
1558 if not stat.S_ISDIR(mode):
1559 # Path component is a file, not a directory
1560 return None
1562 # Load the next tree
1563 obj = object_store[sha]
1564 if not isinstance(obj, Tree):
1565 return None
1566 current_tree = obj
1568 return current_tree.id
1571def commit_tree(
1572 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, ObjectID, int]]
1573) -> ObjectID:
1574 """Commit a new tree.
1576 Args:
1577 object_store: Object store to add trees to
1578 blobs: Iterable over blob path, sha, mode entries
1579 Returns:
1580 SHA1 of the created tree.
1581 """
1582 trees: dict[bytes, TreeDict] = {b"": {}}
1584 def add_tree(path: bytes) -> TreeDict:
1585 if path in trees:
1586 return trees[path]
1587 dirname, basename = pathsplit(path)
1588 t = add_tree(dirname)
1589 assert isinstance(basename, bytes)
1590 newtree: TreeDict = {}
1591 t[basename] = newtree
1592 trees[path] = newtree
1593 return newtree
1595 for path, sha, mode in blobs:
1596 tree_path, basename = pathsplit(path)
1597 tree = add_tree(tree_path)
1598 tree[basename] = (mode, sha)
1600 def build_tree(path: bytes) -> ObjectID:
1601 tree = Tree()
1602 for basename, entry in trees[path].items():
1603 if isinstance(entry, dict):
1604 mode = stat.S_IFDIR
1605 sha = build_tree(pathjoin(path, basename))
1606 else:
1607 (mode, sha) = entry
1608 tree.add(basename, mode, sha)
1609 object_store.add_object(tree)
1610 return tree.id
1612 return build_tree(b"")
1615def commit_index(object_store: ObjectContainer, index: Index) -> ObjectID:
1616 """Create a new tree from an index.
1618 Args:
1619 object_store: Object store to save the tree in
1620 index: Index file
1621 Note: This function is deprecated, use index.commit() instead.
1622 Returns: Root tree sha.
1623 """
1624 return commit_tree(object_store, index.iterobjects())
1627def changes_from_tree(
1628 names: Iterable[bytes],
1629 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1630 object_store: ObjectContainer,
1631 tree: ObjectID | None,
1632 want_unchanged: bool = False,
1633) -> Iterable[
1634 tuple[
1635 tuple[bytes | None, bytes | None],
1636 tuple[int | None, int | None],
1637 tuple[bytes | None, bytes | None],
1638 ]
1639]:
1640 """Find the differences between the contents of a tree and a working copy.
1642 Args:
1643 names: Iterable of names in the working copy
1644 lookup_entry: Function to lookup an entry in the working copy
1645 object_store: Object store to use for retrieving tree contents
1646 tree: SHA1 of the root tree, or None for an empty tree
1647 want_unchanged: Whether unchanged files should be reported
1648 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1649 (oldsha, newsha)
1650 """
1651 # TODO(jelmer): Support a include_trees option
1652 other_names = set(names)
1654 if tree is not None:
1655 for name, mode, sha in iter_tree_contents(object_store, tree):
1656 assert name is not None and mode is not None and sha is not None
1657 try:
1658 (other_sha, other_mode) = lookup_entry(name)
1659 except KeyError:
1660 # Was removed
1661 yield ((name, None), (mode, None), (sha, None))
1662 else:
1663 other_names.remove(name)
1664 if want_unchanged or other_sha != sha or other_mode != mode:
1665 yield ((name, name), (mode, other_mode), (sha, other_sha))
1667 # Mention added files
1668 for name in other_names:
1669 try:
1670 (other_sha, other_mode) = lookup_entry(name)
1671 except KeyError:
1672 pass
1673 else:
1674 yield ((None, name), (None, other_mode), (None, other_sha))
1677def index_entry_from_stat(
1678 stat_val: os.stat_result,
1679 hex_sha: bytes,
1680 mode: int | None = None,
1681) -> IndexEntry:
1682 """Create a new index entry from a stat value.
1684 Args:
1685 stat_val: POSIX stat_result instance
1686 hex_sha: Hex sha of the object
1687 mode: Optional file mode, will be derived from stat if not provided
1688 """
1689 if mode is None:
1690 mode = cleanup_mode(stat_val.st_mode)
1692 from dulwich.objects import ObjectID
1694 # Use nanosecond precision when available to avoid precision loss
1695 # through float representation
1696 ctime: int | float | tuple[int, int]
1697 mtime: int | float | tuple[int, int]
1698 st_ctime_ns = getattr(stat_val, "st_ctime_ns", None)
1699 if st_ctime_ns is not None:
1700 ctime = (
1701 st_ctime_ns // 1_000_000_000,
1702 st_ctime_ns % 1_000_000_000,
1703 )
1704 else:
1705 ctime = stat_val.st_ctime
1707 st_mtime_ns = getattr(stat_val, "st_mtime_ns", None)
1708 if st_mtime_ns is not None:
1709 mtime = (
1710 st_mtime_ns // 1_000_000_000,
1711 st_mtime_ns % 1_000_000_000,
1712 )
1713 else:
1714 mtime = stat_val.st_mtime
1716 return IndexEntry(
1717 ctime=ctime,
1718 mtime=mtime,
1719 dev=stat_val.st_dev,
1720 ino=stat_val.st_ino,
1721 mode=mode,
1722 uid=stat_val.st_uid,
1723 gid=stat_val.st_gid,
1724 size=stat_val.st_size,
1725 sha=ObjectID(hex_sha),
1726 flags=0,
1727 extended_flags=0,
1728 )
1731if sys.platform == "win32":
1732 # On Windows, creating symlinks either requires administrator privileges
1733 # or developer mode. Raise a more helpful error when we're unable to
1734 # create symlinks
1736 # https://github.com/jelmer/dulwich/issues/1005
1738 class WindowsSymlinkPermissionError(PermissionError):
1739 """Windows-specific error for symlink creation failures.
1741 This error is raised when symlink creation fails on Windows,
1742 typically due to lack of developer mode or administrator privileges.
1743 """
1745 def __init__(self, errno: int, msg: str, filename: str | None) -> None:
1746 """Initialize WindowsSymlinkPermissionError."""
1747 super().__init__(
1748 errno,
1749 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1750 filename,
1751 )
1753 def symlink(
1754 src: str | bytes,
1755 dst: str | bytes,
1756 target_is_directory: bool = False,
1757 *,
1758 dir_fd: int | None = None,
1759 ) -> None:
1760 """Create a symbolic link on Windows with better error handling.
1762 Args:
1763 src: Source path for the symlink
1764 dst: Destination path where symlink will be created
1765 target_is_directory: Whether the target is a directory
1766 dir_fd: Optional directory file descriptor
1768 Raises:
1769 WindowsSymlinkPermissionError: If symlink creation fails due to permissions
1770 """
1771 try:
1772 return os.symlink(
1773 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1774 )
1775 except PermissionError as e:
1776 raise WindowsSymlinkPermissionError(
1777 e.errno or 0, e.strerror or "", e.filename
1778 ) from e
1779else:
1780 symlink = os.symlink
1783def build_file_from_blob(
1784 blob: Blob,
1785 mode: int,
1786 target_path: bytes,
1787 *,
1788 honor_filemode: bool = True,
1789 tree_encoding: str = "utf-8",
1790 symlink_fn: Callable[
1791 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
1792 ]
1793 | None = None,
1794) -> os.stat_result:
1795 """Build a file or symlink on disk based on a Git object.
1797 Args:
1798 blob: The git object
1799 mode: File mode
1800 target_path: Path to write to
1801 honor_filemode: An optional flag to honor core.filemode setting in
1802 config file, default is core.filemode=True, change executable bit
1803 tree_encoding: Encoding to use for tree contents
1804 symlink_fn: Function to use for creating symlinks
1805 Returns: stat object for the file
1806 """
1807 try:
1808 oldstat = os.lstat(target_path)
1809 except FileNotFoundError:
1810 oldstat = None
1811 contents = blob.as_raw_string()
1812 if stat.S_ISLNK(mode):
1813 if oldstat:
1814 _remove_file_with_readonly_handling(target_path)
1815 if sys.platform == "win32":
1816 # os.readlink on Python3 on Windows requires a unicode string.
1817 contents_str = contents.decode(tree_encoding)
1818 target_path_str = target_path.decode(tree_encoding)
1819 (symlink_fn or symlink)(contents_str, target_path_str)
1820 else:
1821 (symlink_fn or symlink)(contents, target_path)
1822 else:
1823 if oldstat is not None and oldstat.st_size == len(contents):
1824 with open(target_path, "rb") as f:
1825 if f.read() == contents:
1826 return oldstat
1828 with open(target_path, "wb") as f:
1829 # Write out file
1830 f.write(contents)
1832 if honor_filemode:
1833 os.chmod(target_path, mode)
1835 return os.lstat(target_path)
1838INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1841def _normalize_path_element_default(element: bytes) -> bytes:
1842 """Normalize path element for default case-insensitive comparison."""
1843 return element.lower()
1846def _normalize_path_element_ntfs(element: bytes) -> bytes:
1847 """Normalize path element for NTFS filesystem."""
1848 return element.rstrip(b". ").lower()
1851def _normalize_path_element_hfs(element: bytes) -> bytes:
1852 """Normalize path element for HFS+ filesystem."""
1853 import unicodedata
1855 # Decode to Unicode (let UnicodeDecodeError bubble up)
1856 element_str = element.decode("utf-8", errors="strict")
1858 # Remove HFS+ ignorable characters
1859 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1860 # Normalize to NFD
1861 normalized = unicodedata.normalize("NFD", filtered)
1862 return normalized.lower().encode("utf-8", errors="strict")
1865def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
1866 """Get the appropriate path element normalization function based on config.
1868 Args:
1869 config: Repository configuration object
1871 Returns:
1872 Function that normalizes path elements for the configured filesystem
1873 """
1874 import os
1875 import sys
1877 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1878 return _normalize_path_element_ntfs
1879 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1880 return _normalize_path_element_hfs
1881 else:
1882 return _normalize_path_element_default
1885def validate_path_element_default(element: bytes) -> bool:
1886 """Validate a path element using default rules.
1888 Args:
1889 element: Path element to validate
1891 Returns:
1892 True if path element is valid, False otherwise
1893 """
1894 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1897def validate_path_element_ntfs(element: bytes) -> bool:
1898 """Validate a path element using NTFS filesystem rules.
1900 Args:
1901 element: Path element to validate
1903 Returns:
1904 True if path element is valid for NTFS, False otherwise
1905 """
1906 normalized = _normalize_path_element_ntfs(element)
1907 if normalized in INVALID_DOTNAMES:
1908 return False
1909 if normalized == b"git~1":
1910 return False
1911 return True
1914# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1915HFS_IGNORABLE_CHARS = {
1916 0x200C, # ZERO WIDTH NON-JOINER
1917 0x200D, # ZERO WIDTH JOINER
1918 0x200E, # LEFT-TO-RIGHT MARK
1919 0x200F, # RIGHT-TO-LEFT MARK
1920 0x202A, # LEFT-TO-RIGHT EMBEDDING
1921 0x202B, # RIGHT-TO-LEFT EMBEDDING
1922 0x202C, # POP DIRECTIONAL FORMATTING
1923 0x202D, # LEFT-TO-RIGHT OVERRIDE
1924 0x202E, # RIGHT-TO-LEFT OVERRIDE
1925 0x206A, # INHIBIT SYMMETRIC SWAPPING
1926 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1927 0x206C, # INHIBIT ARABIC FORM SHAPING
1928 0x206D, # ACTIVATE ARABIC FORM SHAPING
1929 0x206E, # NATIONAL DIGIT SHAPES
1930 0x206F, # NOMINAL DIGIT SHAPES
1931 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1932}
1935def validate_path_element_hfs(element: bytes) -> bool:
1936 """Validate path element for HFS+ filesystem.
1938 Equivalent to Git's is_hfs_dotgit and related checks.
1939 Uses NFD normalization and ignores HFS+ ignorable characters.
1940 """
1941 try:
1942 normalized = _normalize_path_element_hfs(element)
1943 except UnicodeDecodeError:
1944 # Malformed UTF-8 - be conservative and reject
1945 return False
1947 # Check against invalid names
1948 if normalized in INVALID_DOTNAMES:
1949 return False
1951 # Also check for 8.3 short name
1952 if normalized == b"git~1":
1953 return False
1955 return True
1958def validate_path(
1959 path: bytes,
1960 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1961) -> bool:
1962 """Default path validator that just checks for .git/."""
1963 parts = path.split(b"/")
1964 for p in parts:
1965 if not element_validator(p):
1966 return False
1967 else:
1968 return True
1971def build_index_from_tree(
1972 root_path: str | bytes,
1973 index_path: str | bytes,
1974 object_store: ObjectContainer,
1975 tree_id: ObjectID,
1976 honor_filemode: bool = True,
1977 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1978 symlink_fn: Callable[
1979 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
1980 ]
1981 | None = None,
1982 blob_normalizer: "FilterBlobNormalizer | None" = None,
1983 tree_encoding: str = "utf-8",
1984) -> None:
1985 """Generate and materialize index from a tree.
1987 Args:
1988 tree_id: Tree to materialize
1989 root_path: Target dir for materialized index files
1990 index_path: Target path for generated index
1991 object_store: Non-empty object store holding tree contents
1992 honor_filemode: An optional flag to honor core.filemode setting in
1993 config file, default is core.filemode=True, change executable bit
1994 validate_path_element: Function to validate path elements to check
1995 out; default just refuses .git and .. directories.
1996 symlink_fn: Function to use for creating symlinks
1997 blob_normalizer: An optional BlobNormalizer to use for converting line
1998 endings when writing blobs to the working directory.
1999 tree_encoding: Encoding used for tree paths (default: utf-8)
2001 Note: existing index is wiped and contents are not merged
2002 in a working dir. Suitable only for fresh clones.
2003 """
2004 index = Index(index_path, read=False)
2005 if not isinstance(root_path, bytes):
2006 root_path = os.fsencode(root_path)
2008 for entry in iter_tree_contents(object_store, tree_id):
2009 assert (
2010 entry.path is not None and entry.mode is not None and entry.sha is not None
2011 )
2012 if not validate_path(entry.path, validate_path_element):
2013 continue
2014 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
2016 if not os.path.exists(os.path.dirname(full_path)):
2017 os.makedirs(os.path.dirname(full_path))
2019 # TODO(jelmer): Merge new index into working tree
2020 if S_ISGITLINK(entry.mode):
2021 if not os.path.isdir(full_path):
2022 os.mkdir(full_path)
2023 st = os.lstat(full_path)
2024 # TODO(jelmer): record and return submodule paths
2025 else:
2026 obj = object_store[entry.sha]
2027 assert isinstance(obj, Blob)
2028 # Apply blob normalization for checkout if normalizer is provided
2029 if blob_normalizer is not None:
2030 obj = blob_normalizer.checkout_normalize(obj, entry.path)
2031 st = build_file_from_blob(
2032 obj,
2033 entry.mode,
2034 full_path,
2035 honor_filemode=honor_filemode,
2036 tree_encoding=tree_encoding,
2037 symlink_fn=symlink_fn,
2038 )
2040 # Add file to index
2041 if not honor_filemode or S_ISGITLINK(entry.mode):
2042 # we can not use tuple slicing to build a new tuple,
2043 # because on windows that will convert the times to
2044 # longs, which causes errors further along
2045 st_tuple = (
2046 entry.mode,
2047 st.st_ino,
2048 st.st_dev,
2049 st.st_nlink,
2050 st.st_uid,
2051 st.st_gid,
2052 st.st_size,
2053 st.st_atime,
2054 st.st_mtime,
2055 st.st_ctime,
2056 )
2057 st = st.__class__(st_tuple)
2058 # default to a stage 0 index entry (normal)
2059 # when reading from the filesystem
2060 index[entry.path] = index_entry_from_stat(st, entry.sha)
2062 index.write()
2065def blob_from_path_and_mode(
2066 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
2067) -> Blob:
2068 """Create a blob from a path and a stat object.
2070 Args:
2071 fs_path: Full file system path to file
2072 mode: File mode
2073 tree_encoding: Encoding to use for tree contents
2074 Returns: A `Blob` object
2075 """
2076 assert isinstance(fs_path, bytes)
2077 blob = Blob()
2078 if stat.S_ISLNK(mode):
2079 if sys.platform == "win32":
2080 # os.readlink on Python3 on Windows requires a unicode string.
2081 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
2082 else:
2083 blob.data = os.readlink(fs_path)
2084 else:
2085 with open(fs_path, "rb") as f:
2086 blob.data = f.read()
2087 return blob
2090def blob_from_path_and_stat(
2091 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
2092) -> Blob:
2093 """Create a blob from a path and a stat object.
2095 Args:
2096 fs_path: Full file system path to file
2097 st: A stat object
2098 tree_encoding: Encoding to use for tree contents
2099 Returns: A `Blob` object
2100 """
2101 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
2104def read_submodule_head(path: str | bytes) -> bytes | None:
2105 """Read the head commit of a submodule.
2107 Args:
2108 path: path to the submodule
2109 Returns: HEAD sha, None if not a valid head/repository
2110 """
2111 from .errors import NotGitRepository
2112 from .repo import Repo
2114 # Repo currently expects a "str", so decode if necessary.
2115 # TODO(jelmer): Perhaps move this into Repo() ?
2116 if not isinstance(path, str):
2117 path = os.fsdecode(path)
2118 try:
2119 repo = Repo(path)
2120 except NotGitRepository:
2121 return None
2122 try:
2123 return repo.head()
2124 except KeyError:
2125 return None
2128def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
2129 """Check if a directory has changed after getting an error.
2131 When handling an error trying to create a blob from a path, call this
2132 function. It will check if the path is a directory. If it's a directory
2133 and a submodule, check the submodule head to see if it's has changed. If
2134 not, consider the file as changed as Git tracked a file and not a
2135 directory.
2137 Return true if the given path should be considered as changed and False
2138 otherwise or if the path is not a directory.
2139 """
2140 # This is actually a directory
2141 if os.path.exists(os.path.join(tree_path, b".git")):
2142 # Submodule
2143 head = read_submodule_head(tree_path)
2144 if entry.sha != head:
2145 return True
2146 else:
2147 # The file was changed to a directory, so consider it removed.
2148 return True
2150 return False
2153os_sep_bytes = os.sep.encode("ascii")
2156def _ensure_parent_dir_exists(full_path: bytes) -> None:
2157 """Ensure parent directory exists, checking no parent is a file."""
2158 parent_dir = os.path.dirname(full_path)
2159 if parent_dir and not os.path.exists(parent_dir):
2160 # Walk up the directory tree to find the first existing parent
2161 current = parent_dir
2162 parents_to_check: list[bytes] = []
2164 while current and not os.path.exists(current):
2165 parents_to_check.insert(0, current)
2166 new_parent = os.path.dirname(current)
2167 if new_parent == current:
2168 # Reached the root or can't go up further
2169 break
2170 current = new_parent
2172 # Check if the existing parent (if any) is a directory
2173 if current and os.path.exists(current) and not os.path.isdir(current):
2174 raise OSError(
2175 f"Cannot create directory, parent path is a file: {current!r}"
2176 )
2178 # Now check each parent we need to create isn't blocked by an existing file
2179 for parent_path in parents_to_check:
2180 if os.path.exists(parent_path) and not os.path.isdir(parent_path):
2181 raise OSError(
2182 f"Cannot create directory, parent path is a file: {parent_path!r}"
2183 )
2185 os.makedirs(parent_dir)
2188def _remove_file_with_readonly_handling(path: bytes) -> None:
2189 """Remove a file, handling read-only files on Windows.
2191 Args:
2192 path: Path to the file to remove
2193 """
2194 try:
2195 os.unlink(path)
2196 except PermissionError:
2197 # On Windows, remove read-only attribute and retry
2198 if sys.platform == "win32":
2199 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
2200 os.unlink(path)
2201 else:
2202 raise
2205def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
2206 """Remove empty parent directories up to stop_at."""
2207 parent = os.path.dirname(path)
2208 while parent and parent != stop_at:
2209 try:
2210 os.rmdir(parent)
2211 parent = os.path.dirname(parent)
2212 except FileNotFoundError:
2213 # Directory doesn't exist - stop trying
2214 break
2215 except OSError as e:
2216 if e.errno in (errno.ENOTEMPTY, errno.EEXIST):
2217 # Directory not empty - stop trying
2218 break
2219 raise
2222def _check_symlink_matches(
2223 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: ObjectID
2224) -> bool:
2225 """Check if symlink target matches expected target.
2227 Returns True if symlink matches, False if it doesn't match.
2228 """
2229 try:
2230 current_target = os.readlink(full_path)
2231 blob_obj = repo_object_store[entry_sha]
2232 expected_target = blob_obj.as_raw_string()
2233 if isinstance(current_target, str):
2234 current_target = current_target.encode()
2235 return current_target == expected_target
2236 except FileNotFoundError:
2237 # Symlink doesn't exist
2238 return False
2239 except OSError as e:
2240 if e.errno == errno.EINVAL:
2241 # Not a symlink
2242 return False
2243 raise
2246def _check_file_matches(
2247 repo_object_store: "BaseObjectStore",
2248 full_path: bytes,
2249 entry_sha: ObjectID,
2250 entry_mode: int,
2251 current_stat: os.stat_result,
2252 honor_filemode: bool,
2253 blob_normalizer: "FilterBlobNormalizer | None" = None,
2254 tree_path: bytes | None = None,
2255) -> bool:
2256 """Check if a file on disk matches the expected git object.
2258 Returns True if file matches, False if it doesn't match.
2259 """
2260 # Check mode first (if honor_filemode is True)
2261 if honor_filemode:
2262 current_mode = stat.S_IMODE(current_stat.st_mode)
2263 expected_mode = stat.S_IMODE(entry_mode)
2265 # For regular files, only check the user executable bit, not group/other permissions
2266 # This matches Git's behavior where umask differences don't count as modifications
2267 if stat.S_ISREG(current_stat.st_mode):
2268 # Normalize regular file modes to ignore group/other write permissions
2269 current_mode_normalized = (
2270 current_mode & 0o755
2271 ) # Keep only user rwx and all read+execute
2272 expected_mode_normalized = expected_mode & 0o755
2274 # For Git compatibility, regular files should be either 644 or 755
2275 if expected_mode_normalized not in (0o644, 0o755):
2276 expected_mode_normalized = 0o644 # Default for regular files
2277 if current_mode_normalized not in (0o644, 0o755):
2278 # Determine if it should be executable based on user execute bit
2279 if current_mode & 0o100: # User execute bit is set
2280 current_mode_normalized = 0o755
2281 else:
2282 current_mode_normalized = 0o644
2284 if current_mode_normalized != expected_mode_normalized:
2285 return False
2286 else:
2287 # For non-regular files (symlinks, etc.), check mode exactly
2288 if current_mode != expected_mode:
2289 return False
2291 # If mode matches (or we don't care), check content via size first
2292 blob_obj = repo_object_store[entry_sha]
2293 if current_stat.st_size != blob_obj.raw_length():
2294 return False
2296 # Size matches, check actual content
2297 try:
2298 with open(full_path, "rb") as f:
2299 current_content = f.read()
2300 expected_content = blob_obj.as_raw_string()
2301 if blob_normalizer and tree_path is not None:
2302 assert isinstance(blob_obj, Blob)
2303 normalized_blob = blob_normalizer.checkout_normalize(
2304 blob_obj, tree_path
2305 )
2306 expected_content = normalized_blob.as_raw_string()
2307 return current_content == expected_content
2308 except (FileNotFoundError, PermissionError, IsADirectoryError):
2309 return False
2312def _transition_to_submodule(
2313 repo: "Repo",
2314 path: bytes,
2315 full_path: bytes,
2316 current_stat: os.stat_result | None,
2317 entry: IndexEntry | TreeEntry,
2318 index: Index,
2319) -> None:
2320 """Transition any type to submodule."""
2321 from .submodule import ensure_submodule_placeholder
2323 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2324 # Already a directory, just ensure .git file exists
2325 ensure_submodule_placeholder(repo, path)
2326 else:
2327 # Remove whatever is there and create submodule
2328 if current_stat is not None:
2329 _remove_file_with_readonly_handling(full_path)
2330 ensure_submodule_placeholder(repo, path)
2332 st = os.lstat(full_path)
2333 assert entry.sha is not None
2334 index[path] = index_entry_from_stat(st, entry.sha)
2337def _transition_to_file(
2338 object_store: "BaseObjectStore",
2339 path: bytes,
2340 full_path: bytes,
2341 current_stat: os.stat_result | None,
2342 entry: IndexEntry | TreeEntry,
2343 index: Index,
2344 honor_filemode: bool,
2345 symlink_fn: Callable[
2346 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2347 ]
2348 | None,
2349 blob_normalizer: "FilterBlobNormalizer | None",
2350 tree_encoding: str = "utf-8",
2351) -> None:
2352 """Transition any type to regular file or symlink."""
2353 assert entry.sha is not None and entry.mode is not None
2354 # Check if we need to update
2355 if (
2356 current_stat is not None
2357 and stat.S_ISREG(current_stat.st_mode)
2358 and not stat.S_ISLNK(entry.mode)
2359 ):
2360 # File to file - check if update needed
2361 file_matches = _check_file_matches(
2362 object_store,
2363 full_path,
2364 entry.sha,
2365 entry.mode,
2366 current_stat,
2367 honor_filemode,
2368 blob_normalizer,
2369 path,
2370 )
2371 needs_update = not file_matches
2372 elif (
2373 current_stat is not None
2374 and stat.S_ISLNK(current_stat.st_mode)
2375 and stat.S_ISLNK(entry.mode)
2376 ):
2377 # Symlink to symlink - check if update needed
2378 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
2379 needs_update = not symlink_matches
2380 else:
2381 needs_update = True
2383 if not needs_update:
2384 # Just update index - current_stat should always be valid here since we're not updating
2385 assert current_stat is not None
2386 index[path] = index_entry_from_stat(current_stat, entry.sha)
2387 return
2389 # Remove existing entry if needed
2390 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2391 # Remove directory
2392 dir_contents = set(os.listdir(full_path))
2393 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2395 if git_file_name in dir_contents:
2396 if dir_contents != {git_file_name}:
2397 raise IsADirectoryError(
2398 f"Cannot replace submodule with untracked files: {full_path!r}"
2399 )
2400 shutil.rmtree(full_path)
2401 else:
2402 try:
2403 os.rmdir(full_path)
2404 except OSError as e:
2405 if e.errno in (errno.ENOTEMPTY, errno.EEXIST):
2406 raise IsADirectoryError(
2407 f"Cannot replace non-empty directory with file: {full_path!r}"
2408 )
2409 raise
2410 elif current_stat is not None:
2411 _remove_file_with_readonly_handling(full_path)
2413 # Ensure parent directory exists
2414 _ensure_parent_dir_exists(full_path)
2416 # Write the file
2417 blob_obj = object_store[entry.sha]
2418 assert isinstance(blob_obj, Blob)
2419 if blob_normalizer:
2420 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
2421 st = build_file_from_blob(
2422 blob_obj,
2423 entry.mode,
2424 full_path,
2425 honor_filemode=honor_filemode,
2426 tree_encoding=tree_encoding,
2427 symlink_fn=symlink_fn,
2428 )
2429 index[path] = index_entry_from_stat(st, entry.sha)
2432def _transition_to_absent(
2433 repo: "Repo",
2434 path: bytes,
2435 full_path: bytes,
2436 current_stat: os.stat_result | None,
2437 index: Index,
2438) -> None:
2439 """Remove any type of entry."""
2440 if current_stat is None:
2441 return
2443 if stat.S_ISDIR(current_stat.st_mode):
2444 # Check if it's a submodule directory
2445 dir_contents = set(os.listdir(full_path))
2446 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2448 if git_file_name in dir_contents and dir_contents == {git_file_name}:
2449 shutil.rmtree(full_path)
2450 else:
2451 try:
2452 os.rmdir(full_path)
2453 except OSError as e:
2454 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
2455 raise
2456 else:
2457 _remove_file_with_readonly_handling(full_path)
2459 try:
2460 del index[path]
2461 except KeyError:
2462 pass
2464 # Try to remove empty parent directories
2465 _remove_empty_parents(
2466 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2467 )
2470def detect_case_only_renames(
2471 changes: Sequence["TreeChange"],
2472 config: "Config",
2473) -> list["TreeChange"]:
2474 """Detect and transform case-only renames in a list of tree changes.
2476 This function identifies file renames that only differ in case (e.g.,
2477 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
2478 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
2479 based on the repository configuration.
2481 Args:
2482 changes: List of TreeChange objects representing file changes
2483 config: Repository configuration object
2485 Returns:
2486 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
2487 """
2488 from .diff_tree import (
2489 CHANGE_ADD,
2490 CHANGE_COPY,
2491 CHANGE_DELETE,
2492 CHANGE_MODIFY,
2493 CHANGE_RENAME,
2494 TreeChange,
2495 )
2497 # Build dictionaries of old and new paths with their normalized forms
2498 old_paths_normalized = {}
2499 new_paths_normalized = {}
2500 old_changes = {} # Map from old path to change object
2501 new_changes = {} # Map from new path to change object
2503 # Get the appropriate normalizer based on config
2504 normalize_func = get_path_element_normalizer(config)
2506 def normalize_path(path: bytes) -> bytes:
2507 """Normalize entire path using element normalization."""
2508 return b"/".join(normalize_func(part) for part in path.split(b"/"))
2510 # Pre-normalize all paths once to avoid repeated normalization
2511 for change in changes:
2512 if change.type == CHANGE_DELETE and change.old:
2513 assert change.old.path is not None
2514 try:
2515 normalized = normalize_path(change.old.path)
2516 except UnicodeDecodeError:
2517 import logging
2519 logging.warning(
2520 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2521 change.old.path,
2522 )
2523 else:
2524 old_paths_normalized[normalized] = change.old.path
2525 old_changes[change.old.path] = change
2526 elif change.type == CHANGE_RENAME and change.old:
2527 assert change.old.path is not None
2528 # Treat RENAME as DELETE + ADD for case-only detection
2529 try:
2530 normalized = normalize_path(change.old.path)
2531 except UnicodeDecodeError:
2532 import logging
2534 logging.warning(
2535 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2536 change.old.path,
2537 )
2538 else:
2539 old_paths_normalized[normalized] = change.old.path
2540 old_changes[change.old.path] = change
2542 if (
2543 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
2544 and change.new
2545 ):
2546 assert change.new.path is not None
2547 try:
2548 normalized = normalize_path(change.new.path)
2549 except UnicodeDecodeError:
2550 import logging
2552 logging.warning(
2553 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2554 change.new.path,
2555 )
2556 else:
2557 new_paths_normalized[normalized] = change.new.path
2558 new_changes[change.new.path] = change
2560 # Find case-only renames and transform changes
2561 case_only_renames = set()
2562 new_rename_changes = []
2564 for norm_path, old_path in old_paths_normalized.items():
2565 if norm_path in new_paths_normalized:
2566 new_path = new_paths_normalized[norm_path]
2567 if old_path != new_path:
2568 # Found a case-only rename
2569 old_change = old_changes[old_path]
2570 new_change = new_changes[new_path]
2572 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
2573 if new_change.type == CHANGE_ADD:
2574 # Simple case: DELETE + ADD becomes RENAME
2575 rename_change = TreeChange(
2576 CHANGE_RENAME, old_change.old, new_change.new
2577 )
2578 else:
2579 # Complex case: DELETE + MODIFY becomes RENAME
2580 # Use the old file from DELETE and new file from MODIFY
2581 rename_change = TreeChange(
2582 CHANGE_RENAME, old_change.old, new_change.new
2583 )
2585 new_rename_changes.append(rename_change)
2587 # Mark the old changes for removal
2588 case_only_renames.add(old_change)
2589 case_only_renames.add(new_change)
2591 # Return new list with original ADD/DELETE changes replaced by renames
2592 result = [change for change in changes if change not in case_only_renames]
2593 result.extend(new_rename_changes)
2594 return result
2597def update_working_tree(
2598 repo: "Repo",
2599 old_tree_id: bytes | None,
2600 new_tree_id: bytes,
2601 change_iterator: Iterator["TreeChange"],
2602 honor_filemode: bool = True,
2603 validate_path_element: Callable[[bytes], bool] | None = None,
2604 symlink_fn: Callable[
2605 [str | bytes | os.PathLike[str], str | bytes | os.PathLike[str]], None
2606 ]
2607 | None = None,
2608 force_remove_untracked: bool = False,
2609 blob_normalizer: "FilterBlobNormalizer | None" = None,
2610 tree_encoding: str = "utf-8",
2611 allow_overwrite_modified: bool = False,
2612) -> None:
2613 """Update the working tree and index to match a new tree.
2615 This function handles:
2616 - Adding new files
2617 - Updating modified files
2618 - Removing deleted files
2619 - Cleaning up empty directories
2621 Args:
2622 repo: Repository object
2623 old_tree_id: SHA of the tree before the update
2624 new_tree_id: SHA of the tree to update to
2625 change_iterator: Iterator of TreeChange objects to apply
2626 honor_filemode: An optional flag to honor core.filemode setting
2627 validate_path_element: Function to validate path elements to check out
2628 symlink_fn: Function to use for creating symlinks
2629 force_remove_untracked: If True, remove files that exist in working
2630 directory but not in target tree, even if old_tree_id is None
2631 blob_normalizer: An optional BlobNormalizer to use for converting line
2632 endings when writing blobs to the working directory.
2633 tree_encoding: Encoding used for tree paths (default: utf-8)
2634 allow_overwrite_modified: If False, raise an error when attempting to
2635 overwrite files that have been modified compared to old_tree_id
2636 """
2637 if validate_path_element is None:
2638 validate_path_element = validate_path_element_default
2640 from .diff_tree import (
2641 CHANGE_ADD,
2642 CHANGE_COPY,
2643 CHANGE_DELETE,
2644 CHANGE_MODIFY,
2645 CHANGE_RENAME,
2646 CHANGE_UNCHANGED,
2647 )
2649 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2650 index = repo.open_index()
2652 # Convert iterator to list since we need multiple passes
2653 changes = list(change_iterator)
2655 # Transform case-only renames on case-insensitive filesystems
2656 import platform
2658 default_ignore_case = platform.system() in ("Windows", "Darwin")
2659 config = repo.get_config()
2660 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2662 if ignore_case:
2663 config = repo.get_config()
2664 changes = detect_case_only_renames(changes, config)
2666 # Check for path conflicts where files need to become directories
2667 paths_becoming_dirs = set()
2668 for change in changes:
2669 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2670 assert change.new is not None
2671 path = change.new.path
2672 assert path is not None
2673 if b"/" in path: # This is a file inside a directory
2674 # Check if any parent path exists as a file in the old tree or changes
2675 parts = path.split(b"/")
2676 for i in range(1, len(parts)):
2677 parent = b"/".join(parts[:i])
2678 # See if this parent path is being deleted (was a file, becoming a dir)
2679 for other_change in changes:
2680 if (
2681 other_change.type == CHANGE_DELETE
2682 and other_change.old
2683 and other_change.old.path == parent
2684 ):
2685 paths_becoming_dirs.add(parent)
2687 # Check if any path that needs to become a directory has been modified
2688 for path in paths_becoming_dirs:
2689 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2690 try:
2691 current_stat = os.lstat(full_path)
2692 except FileNotFoundError:
2693 continue # File doesn't exist, nothing to check
2694 except OSError as e:
2695 raise OSError(
2696 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2697 ) from e
2699 if stat.S_ISREG(current_stat.st_mode):
2700 # Find the old entry for this path
2701 old_change = None
2702 for change in changes:
2703 if (
2704 change.type == CHANGE_DELETE
2705 and change.old
2706 and change.old.path == path
2707 ):
2708 old_change = change
2709 break
2711 if old_change:
2712 # Check if file has been modified
2713 assert old_change.old is not None
2714 assert (
2715 old_change.old.sha is not None and old_change.old.mode is not None
2716 )
2717 file_matches = _check_file_matches(
2718 repo.object_store,
2719 full_path,
2720 old_change.old.sha,
2721 old_change.old.mode,
2722 current_stat,
2723 honor_filemode,
2724 blob_normalizer,
2725 path,
2726 )
2727 if not file_matches:
2728 raise OSError(
2729 f"Cannot replace modified file with directory: {path!r}"
2730 )
2732 # Check for uncommitted modifications before making any changes
2733 if not allow_overwrite_modified and old_tree_id:
2734 for change in changes:
2735 # Only check files that are being modified or deleted
2736 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2737 path = change.old.path
2738 assert path is not None
2739 if path.startswith(b".git") or not validate_path(
2740 path, validate_path_element
2741 ):
2742 continue
2744 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2745 try:
2746 current_stat = os.lstat(full_path)
2747 except FileNotFoundError:
2748 continue # File doesn't exist, nothing to check
2749 except OSError as e:
2750 raise OSError(
2751 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2752 ) from e
2754 if stat.S_ISREG(current_stat.st_mode):
2755 # Check if working tree file differs from old tree
2756 assert change.old.sha is not None and change.old.mode is not None
2757 file_matches = _check_file_matches(
2758 repo.object_store,
2759 full_path,
2760 change.old.sha,
2761 change.old.mode,
2762 current_stat,
2763 honor_filemode,
2764 blob_normalizer,
2765 path,
2766 )
2767 if not file_matches:
2768 from .errors import WorkingTreeModifiedError
2770 raise WorkingTreeModifiedError(
2771 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2772 f"would be overwritten by checkout. "
2773 f"Please commit your changes or stash them before you switch branches."
2774 )
2776 # Apply the changes
2777 for change in changes:
2778 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2779 # Remove file/directory
2780 assert change.old is not None and change.old.path is not None
2781 path = change.old.path
2782 if path.startswith(b".git") or not validate_path(
2783 path, validate_path_element
2784 ):
2785 continue
2787 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2788 try:
2789 delete_stat: os.stat_result | None = os.lstat(full_path)
2790 except FileNotFoundError:
2791 delete_stat = None
2792 except OSError as e:
2793 raise OSError(
2794 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2795 ) from e
2797 _transition_to_absent(repo, path, full_path, delete_stat, index)
2799 if change.type in (
2800 CHANGE_ADD,
2801 CHANGE_MODIFY,
2802 CHANGE_UNCHANGED,
2803 CHANGE_COPY,
2804 CHANGE_RENAME,
2805 ):
2806 # Add or modify file
2807 assert (
2808 change.new is not None
2809 and change.new.path is not None
2810 and change.new.mode is not None
2811 )
2812 path = change.new.path
2813 if path.startswith(b".git") or not validate_path(
2814 path, validate_path_element
2815 ):
2816 continue
2818 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2819 try:
2820 modify_stat: os.stat_result | None = os.lstat(full_path)
2821 except FileNotFoundError:
2822 modify_stat = None
2823 except OSError as e:
2824 raise OSError(
2825 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2826 ) from e
2828 if S_ISGITLINK(change.new.mode):
2829 _transition_to_submodule(
2830 repo, path, full_path, modify_stat, change.new, index
2831 )
2832 else:
2833 _transition_to_file(
2834 repo.object_store,
2835 path,
2836 full_path,
2837 modify_stat,
2838 change.new,
2839 index,
2840 honor_filemode,
2841 symlink_fn,
2842 blob_normalizer,
2843 tree_encoding,
2844 )
2846 index.write()
2849def _stat_matches_entry(st: os.stat_result, entry: IndexEntry) -> bool:
2850 """Check if filesystem stat matches index entry stat.
2852 This is used to determine if a file might have changed without reading its content.
2853 Git uses this optimization to avoid expensive filter operations on unchanged files.
2855 Args:
2856 st: Filesystem stat result
2857 entry: Index entry to compare against
2858 Returns: True if stat matches and file is likely unchanged
2859 """
2860 # Get entry mtime with nanosecond precision if available
2861 if isinstance(entry.mtime, tuple):
2862 entry_mtime_sec = entry.mtime[0]
2863 entry_mtime_nsec = entry.mtime[1]
2864 else:
2865 entry_mtime_sec = int(entry.mtime)
2866 entry_mtime_nsec = 0
2868 # Compare modification time with nanosecond precision if available
2869 # This is important for fast workflows (e.g., stash) where files can be
2870 # modified multiple times within the same second
2871 if hasattr(st, "st_mtime_ns"):
2872 # Use nanosecond precision when available
2873 st_mtime_nsec = st.st_mtime_ns
2874 entry_mtime_nsec_total = entry_mtime_sec * 1_000_000_000 + entry_mtime_nsec
2875 if st_mtime_nsec != entry_mtime_nsec_total:
2876 return False
2877 else:
2878 # Fall back to second precision
2879 if int(st.st_mtime) != entry_mtime_sec:
2880 return False
2882 # Compare file size
2883 if st.st_size != entry.size:
2884 return False
2886 # If both mtime and size match, file is likely unchanged
2887 return True
2890def _check_entry_for_changes(
2891 tree_path: bytes,
2892 entry: IndexEntry | ConflictedIndexEntry,
2893 root_path: bytes,
2894 filter_blob_callback: Callable[[Blob, bytes], Blob] | None = None,
2895) -> bytes | None:
2896 """Check a single index entry for changes.
2898 Args:
2899 tree_path: Path in the tree
2900 entry: Index entry to check
2901 root_path: Root filesystem path
2902 filter_blob_callback: Optional callback to filter blobs
2903 Returns: tree_path if changed, None otherwise
2904 """
2905 if isinstance(entry, ConflictedIndexEntry):
2906 # Conflicted files are always unstaged
2907 return tree_path
2909 full_path = _tree_to_fs_path(root_path, tree_path)
2910 try:
2911 st = os.lstat(full_path)
2912 if stat.S_ISDIR(st.st_mode):
2913 if _has_directory_changed(tree_path, entry):
2914 return tree_path
2915 return None
2917 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2918 return None
2920 # Optimization: If stat matches index entry (mtime and size unchanged),
2921 # we can skip reading and filtering the file entirely. This is a significant
2922 # performance improvement for repositories with many unchanged files.
2923 # Even with filters (e.g., LFS), if the file hasn't been modified (stat unchanged),
2924 # the filter output would be the same, so we can safely skip the expensive
2925 # filter operation. This addresses performance issues with LFS repositories
2926 # where filter operations can be very slow.
2927 if _stat_matches_entry(st, entry):
2928 return None
2930 blob = blob_from_path_and_stat(full_path, st)
2932 if filter_blob_callback is not None:
2933 blob = filter_blob_callback(blob, tree_path)
2934 except FileNotFoundError:
2935 # The file was removed, so we assume that counts as
2936 # different from whatever file used to exist.
2937 return tree_path
2938 else:
2939 if blob.id != entry.sha:
2940 return tree_path
2941 return None
2944def get_unstaged_changes(
2945 index: Index,
2946 root_path: str | bytes,
2947 filter_blob_callback: Callable[..., Any] | None = None,
2948 preload_index: bool = False,
2949) -> Generator[bytes, None, None]:
2950 """Walk through an index and check for differences against working tree.
2952 Args:
2953 index: index to check
2954 root_path: path in which to find files
2955 filter_blob_callback: Optional callback to filter blobs
2956 preload_index: If True, use parallel threads to check files (requires threading support)
2957 Returns: iterator over paths with unstaged changes
2958 """
2959 # For each entry in the index check the sha1 & ensure not staged
2960 if not isinstance(root_path, bytes):
2961 root_path = os.fsencode(root_path)
2963 if preload_index:
2964 # Use parallel processing for better performance on slow filesystems
2965 try:
2966 import multiprocessing
2967 from concurrent.futures import ThreadPoolExecutor
2968 except ImportError:
2969 # If threading is not available, fall back to serial processing
2970 preload_index = False
2971 else:
2972 # Collect all entries first
2973 entries = list(index.iteritems())
2975 # Use number of CPUs but cap at 8 threads to avoid overhead
2976 num_workers = min(multiprocessing.cpu_count(), 8)
2978 # Process entries in parallel
2979 with ThreadPoolExecutor(max_workers=num_workers) as executor:
2980 # Submit all tasks
2981 futures = [
2982 executor.submit(
2983 _check_entry_for_changes,
2984 tree_path,
2985 entry,
2986 root_path,
2987 filter_blob_callback,
2988 )
2989 for tree_path, entry in entries
2990 ]
2992 # Yield results as they complete
2993 for future in futures:
2994 result = future.result()
2995 if result is not None:
2996 yield result
2998 if not preload_index:
2999 # Serial processing
3000 for tree_path, entry in index.iteritems():
3001 result = _check_entry_for_changes(
3002 tree_path, entry, root_path, filter_blob_callback
3003 )
3004 if result is not None:
3005 yield result
3008def _tree_to_fs_path(
3009 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
3010) -> bytes:
3011 """Convert a git tree path to a file system path.
3013 Args:
3014 root_path: Root filesystem path
3015 tree_path: Git tree path as bytes (encoded with tree_encoding)
3016 tree_encoding: Encoding used for tree paths (default: utf-8)
3018 Returns: File system path.
3019 """
3020 assert isinstance(tree_path, bytes)
3021 if os_sep_bytes != b"/":
3022 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
3023 else:
3024 sep_corrected_path = tree_path
3026 # On Windows, we need to handle tree path encoding properly
3027 if sys.platform == "win32":
3028 # Decode from tree encoding, then re-encode for filesystem
3029 try:
3030 tree_path_str = sep_corrected_path.decode(tree_encoding)
3031 sep_corrected_path = os.fsencode(tree_path_str)
3032 except UnicodeDecodeError:
3033 # If decoding fails, use the original bytes
3034 pass
3036 return os.path.join(root_path, sep_corrected_path)
3039def _fs_to_tree_path(fs_path: str | bytes, tree_encoding: str = "utf-8") -> bytes:
3040 """Convert a file system path to a git tree path.
3042 Args:
3043 fs_path: File system path.
3044 tree_encoding: Encoding to use for tree paths (default: utf-8)
3046 Returns: Git tree path as bytes (encoded with tree_encoding)
3047 """
3048 if not isinstance(fs_path, bytes):
3049 fs_path_bytes = os.fsencode(fs_path)
3050 else:
3051 fs_path_bytes = fs_path
3053 # On Windows, we need to ensure tree paths are properly encoded
3054 if sys.platform == "win32":
3055 try:
3056 # Decode from filesystem encoding, then re-encode with tree encoding
3057 fs_path_str = os.fsdecode(fs_path_bytes)
3058 fs_path_bytes = fs_path_str.encode(tree_encoding)
3059 except UnicodeDecodeError:
3060 # If filesystem decoding fails, use the original bytes
3061 pass
3063 if os_sep_bytes != b"/":
3064 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
3065 else:
3066 tree_path = fs_path_bytes
3067 return tree_path
3070def index_entry_from_directory(st: os.stat_result, path: bytes) -> IndexEntry | None:
3071 """Create an index entry for a directory.
3073 This is only used for submodules (directories containing .git).
3075 Args:
3076 st: Stat result for the directory
3077 path: Path to the directory
3079 Returns:
3080 IndexEntry for a submodule, or None if not a submodule
3081 """
3082 if os.path.exists(os.path.join(path, b".git")):
3083 head = read_submodule_head(path)
3084 if head is None:
3085 return None
3086 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
3087 return None
3090def index_entry_from_path(
3091 path: bytes, object_store: ObjectContainer | None = None
3092) -> IndexEntry | None:
3093 """Create an index from a filesystem path.
3095 This returns an index value for files, symlinks
3096 and tree references. for directories and
3097 non-existent files it returns None
3099 Args:
3100 path: Path to create an index entry for
3101 object_store: Optional object store to
3102 save new blobs in
3103 Returns: An index entry; None for directories
3104 """
3105 assert isinstance(path, bytes)
3106 st = os.lstat(path)
3107 if stat.S_ISDIR(st.st_mode):
3108 return index_entry_from_directory(st, path)
3110 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
3111 blob = blob_from_path_and_stat(path, st)
3112 if object_store is not None:
3113 object_store.add_object(blob)
3114 return index_entry_from_stat(st, blob.id)
3116 return None
3119def iter_fresh_entries(
3120 paths: Iterable[bytes],
3121 root_path: bytes,
3122 object_store: ObjectContainer | None = None,
3123) -> Iterator[tuple[bytes, IndexEntry | None]]:
3124 """Iterate over current versions of index entries on disk.
3126 Args:
3127 paths: Paths to iterate over
3128 root_path: Root path to access from
3129 object_store: Optional store to save new blobs in
3130 Returns: Iterator over path, index_entry
3131 """
3132 for path in paths:
3133 p = _tree_to_fs_path(root_path, path)
3134 try:
3135 entry = index_entry_from_path(p, object_store=object_store)
3136 except (FileNotFoundError, IsADirectoryError):
3137 entry = None
3138 yield path, entry
3141def iter_fresh_objects(
3142 paths: Iterable[bytes],
3143 root_path: bytes,
3144 include_deleted: bool = False,
3145 object_store: ObjectContainer | None = None,
3146) -> Iterator[tuple[bytes, ObjectID | None, int | None]]:
3147 """Iterate over versions of objects on disk referenced by index.
3149 Args:
3150 paths: Paths to check
3151 root_path: Root path to access from
3152 include_deleted: Include deleted entries with sha and
3153 mode set to None
3154 object_store: Optional object store to report new items to
3155 Returns: Iterator over path, sha, mode
3156 """
3157 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
3158 if entry is None:
3159 if include_deleted:
3160 yield path, None, None
3161 else:
3162 yield path, entry.sha, cleanup_mode(entry.mode)
3165def refresh_index(index: Index, root_path: bytes) -> None:
3166 """Refresh the contents of an index.
3168 This is the equivalent to running 'git commit -a'.
3170 Args:
3171 index: Index to update
3172 root_path: Root filesystem path
3173 """
3174 for path, entry in iter_fresh_entries(index, root_path):
3175 if entry:
3176 index[path] = entry
3179class locked_index:
3180 """Lock the index while making modifications.
3182 Works as a context manager.
3183 """
3185 _file: "_GitFile"
3187 def __init__(self, path: bytes | str) -> None:
3188 """Initialize locked_index."""
3189 self._path = path
3191 def __enter__(self) -> Index:
3192 """Enter context manager and lock index."""
3193 f = GitFile(self._path, "wb")
3194 self._file = f
3195 self._index = Index(self._path)
3196 return self._index
3198 def __exit__(
3199 self,
3200 exc_type: type | None,
3201 exc_value: BaseException | None,
3202 traceback: types.TracebackType | None,
3203 ) -> None:
3204 """Exit context manager and unlock index."""
3205 if exc_type is not None:
3206 self._file.abort()
3207 return
3208 try:
3209 f = SHA1Writer(self._file)
3210 write_index_dict(f, self._index._byname)
3211 except BaseException:
3212 self._file.abort()
3213 else:
3214 f.close()