Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 32%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence
32from dataclasses import dataclass
33from enum import Enum
34from typing import (
35 IO,
36 TYPE_CHECKING,
37 Any,
38 BinaryIO,
39 Callable,
40 Optional,
41 Union,
42)
44if TYPE_CHECKING:
45 from .config import Config
46 from .diff_tree import TreeChange
47 from .file import _GitFile
48 from .filters import FilterBlobNormalizer
49 from .object_store import BaseObjectStore
50 from .repo import Repo
52from .file import GitFile
53from .object_store import iter_tree_contents
54from .objects import (
55 S_IFGITLINK,
56 S_ISGITLINK,
57 Blob,
58 ObjectID,
59 Tree,
60 TreeEntry,
61 hex_to_sha,
62 sha_to_hex,
63)
64from .pack import ObjectContainer, SHA1Reader, SHA1Writer
66# Type alias for recursive tree structure used in commit_tree
67if sys.version_info >= (3, 10):
68 TreeDict = dict[bytes, Union["TreeDict", tuple[int, bytes]]]
69else:
70 TreeDict = dict[bytes, Any]
72# 2-bit stage (during merge)
73FLAG_STAGEMASK = 0x3000
74FLAG_STAGESHIFT = 12
75FLAG_NAMEMASK = 0x0FFF
77# assume-valid
78FLAG_VALID = 0x8000
80# extended flag (must be zero in version 2)
81FLAG_EXTENDED = 0x4000
83# used by sparse checkout
84EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
86# used by "git add -N"
87EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
89DEFAULT_VERSION = 2
91# Index extension signatures
92TREE_EXTENSION = b"TREE"
93REUC_EXTENSION = b"REUC"
94UNTR_EXTENSION = b"UNTR"
95EOIE_EXTENSION = b"EOIE"
96IEOT_EXTENSION = b"IEOT"
99def _encode_varint(value: int) -> bytes:
100 """Encode an integer using variable-width encoding.
102 Same format as used for OFS_DELTA pack entries and index v4 path compression.
103 Uses 7 bits per byte, with the high bit indicating continuation.
105 Args:
106 value: Integer to encode
107 Returns:
108 Encoded bytes
109 """
110 if value == 0:
111 return b"\x00"
113 result = []
114 while value > 0:
115 byte = value & 0x7F # Take lower 7 bits
116 value >>= 7
117 if value > 0:
118 byte |= 0x80 # Set continuation bit
119 result.append(byte)
121 return bytes(result)
124def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
125 """Decode a variable-width encoded integer.
127 Args:
128 data: Bytes to decode from
129 offset: Starting offset in data
130 Returns:
131 tuple of (decoded_value, new_offset)
132 """
133 value = 0
134 shift = 0
135 pos = offset
137 while pos < len(data):
138 byte = data[pos]
139 pos += 1
140 value |= (byte & 0x7F) << shift
141 shift += 7
142 if not (byte & 0x80): # No continuation bit
143 break
145 return value, pos
148def _compress_path(path: bytes, previous_path: bytes) -> bytes:
149 """Compress a path relative to the previous path for index version 4.
151 Args:
152 path: Path to compress
153 previous_path: Previous path for comparison
154 Returns:
155 Compressed path data (varint prefix_len + suffix)
156 """
157 # Find the common prefix length
158 common_len = 0
159 min_len = min(len(path), len(previous_path))
161 for i in range(min_len):
162 if path[i] == previous_path[i]:
163 common_len += 1
164 else:
165 break
167 # The number of bytes to remove from the end of previous_path
168 # to get the common prefix
169 remove_len = len(previous_path) - common_len
171 # The suffix to append
172 suffix = path[common_len:]
174 # Encode: varint(remove_len) + suffix + NUL
175 return _encode_varint(remove_len) + suffix + b"\x00"
178def _decompress_path(
179 data: bytes, offset: int, previous_path: bytes
180) -> tuple[bytes, int]:
181 """Decompress a path from index version 4 compressed format.
183 Args:
184 data: Raw data containing compressed path
185 offset: Starting offset in data
186 previous_path: Previous path for decompression
187 Returns:
188 tuple of (decompressed_path, new_offset)
189 """
190 # Decode the number of bytes to remove from previous path
191 remove_len, new_offset = _decode_varint(data, offset)
193 # Find the NUL terminator for the suffix
194 suffix_start = new_offset
195 suffix_end = suffix_start
196 while suffix_end < len(data) and data[suffix_end] != 0:
197 suffix_end += 1
199 if suffix_end >= len(data):
200 raise ValueError("Unterminated path suffix in compressed entry")
202 suffix = data[suffix_start:suffix_end]
203 new_offset = suffix_end + 1 # Skip the NUL terminator
205 # Reconstruct the path
206 if remove_len > len(previous_path):
207 raise ValueError(
208 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
209 )
211 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
212 path = prefix + suffix
214 return path, new_offset
217def _decompress_path_from_stream(
218 f: BinaryIO, previous_path: bytes
219) -> tuple[bytes, int]:
220 """Decompress a path from index version 4 compressed format, reading from stream.
222 Args:
223 f: File-like object to read from
224 previous_path: Previous path for decompression
225 Returns:
226 tuple of (decompressed_path, bytes_consumed)
227 """
228 # Decode the varint for remove_len by reading byte by byte
229 remove_len = 0
230 shift = 0
231 bytes_consumed = 0
233 while True:
234 byte_data = f.read(1)
235 if not byte_data:
236 raise ValueError("Unexpected end of file while reading varint")
237 byte = byte_data[0]
238 bytes_consumed += 1
239 remove_len |= (byte & 0x7F) << shift
240 shift += 7
241 if not (byte & 0x80): # No continuation bit
242 break
244 # Read the suffix until NUL terminator
245 suffix = b""
246 while True:
247 byte_data = f.read(1)
248 if not byte_data:
249 raise ValueError("Unexpected end of file while reading path suffix")
250 byte = byte_data[0]
251 bytes_consumed += 1
252 if byte == 0: # NUL terminator
253 break
254 suffix += bytes([byte])
256 # Reconstruct the path
257 if remove_len > len(previous_path):
258 raise ValueError(
259 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
260 )
262 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
263 path = prefix + suffix
265 return path, bytes_consumed
268class Stage(Enum):
269 """Represents the stage of an index entry during merge conflicts."""
271 NORMAL = 0
272 MERGE_CONFLICT_ANCESTOR = 1
273 MERGE_CONFLICT_THIS = 2
274 MERGE_CONFLICT_OTHER = 3
277@dataclass
278class SerializedIndexEntry:
279 """Represents a serialized index entry as stored in the index file.
281 This dataclass holds the raw data for an index entry before it's
282 parsed into the more user-friendly IndexEntry format.
283 """
285 name: bytes
286 ctime: Union[int, float, tuple[int, int]]
287 mtime: Union[int, float, tuple[int, int]]
288 dev: int
289 ino: int
290 mode: int
291 uid: int
292 gid: int
293 size: int
294 sha: bytes
295 flags: int
296 extended_flags: int
298 def stage(self) -> Stage:
299 """Extract the stage from the flags field.
301 Returns:
302 Stage enum value indicating merge conflict state
303 """
304 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
307@dataclass
308class IndexExtension:
309 """Base class for index extensions."""
311 signature: bytes
312 data: bytes
314 @classmethod
315 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
316 """Create an extension from raw data.
318 Args:
319 signature: 4-byte extension signature
320 data: Extension data
321 Returns:
322 Parsed extension object
323 """
324 if signature == TREE_EXTENSION:
325 return TreeExtension.from_bytes(data)
326 elif signature == REUC_EXTENSION:
327 return ResolveUndoExtension.from_bytes(data)
328 elif signature == UNTR_EXTENSION:
329 return UntrackedExtension.from_bytes(data)
330 else:
331 # Unknown extension - just store raw data
332 return cls(signature, data)
334 def to_bytes(self) -> bytes:
335 """Serialize extension to bytes."""
336 return self.data
339class TreeExtension(IndexExtension):
340 """Tree cache extension."""
342 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
343 """Initialize TreeExtension.
345 Args:
346 entries: List of tree cache entries (path, sha, flags)
347 """
348 self.entries = entries
349 super().__init__(TREE_EXTENSION, b"")
351 @classmethod
352 def from_bytes(cls, data: bytes) -> "TreeExtension":
353 """Parse TreeExtension from bytes.
355 Args:
356 data: Raw bytes to parse
358 Returns:
359 TreeExtension instance
360 """
361 # TODO: Implement tree cache parsing
362 return cls([])
364 def to_bytes(self) -> bytes:
365 """Serialize TreeExtension to bytes.
367 Returns:
368 Serialized extension data
369 """
370 # TODO: Implement tree cache serialization
371 return b""
374class ResolveUndoExtension(IndexExtension):
375 """Resolve undo extension for recording merge conflicts."""
377 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
378 """Initialize ResolveUndoExtension.
380 Args:
381 entries: List of (path, stages) where stages is a list of (stage, sha) tuples
382 """
383 self.entries = entries
384 super().__init__(REUC_EXTENSION, b"")
386 @classmethod
387 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
388 """Parse ResolveUndoExtension from bytes.
390 Args:
391 data: Raw bytes to parse
393 Returns:
394 ResolveUndoExtension instance
395 """
396 # TODO: Implement resolve undo parsing
397 return cls([])
399 def to_bytes(self) -> bytes:
400 """Serialize ResolveUndoExtension to bytes.
402 Returns:
403 Serialized extension data
404 """
405 # TODO: Implement resolve undo serialization
406 return b""
409class UntrackedExtension(IndexExtension):
410 """Untracked cache extension."""
412 def __init__(self, data: bytes) -> None:
413 """Initialize UntrackedExtension.
415 Args:
416 data: Raw untracked cache data
417 """
418 super().__init__(UNTR_EXTENSION, data)
420 @classmethod
421 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
422 """Parse UntrackedExtension from bytes.
424 Args:
425 data: Raw bytes to parse
427 Returns:
428 UntrackedExtension instance
429 """
430 return cls(data)
433@dataclass
434class IndexEntry:
435 """Represents an entry in the Git index.
437 This is a higher-level representation of an index entry that includes
438 parsed data and convenience methods.
439 """
441 ctime: Union[int, float, tuple[int, int]]
442 mtime: Union[int, float, tuple[int, int]]
443 dev: int
444 ino: int
445 mode: int
446 uid: int
447 gid: int
448 size: int
449 sha: bytes
450 flags: int = 0
451 extended_flags: int = 0
453 @classmethod
454 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
455 """Create an IndexEntry from a SerializedIndexEntry.
457 Args:
458 serialized: SerializedIndexEntry to convert
460 Returns:
461 New IndexEntry instance
462 """
463 return cls(
464 ctime=serialized.ctime,
465 mtime=serialized.mtime,
466 dev=serialized.dev,
467 ino=serialized.ino,
468 mode=serialized.mode,
469 uid=serialized.uid,
470 gid=serialized.gid,
471 size=serialized.size,
472 sha=serialized.sha,
473 flags=serialized.flags,
474 extended_flags=serialized.extended_flags,
475 )
477 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
478 """Serialize this entry with a given name and stage.
480 Args:
481 name: Path name for the entry
482 stage: Merge conflict stage
484 Returns:
485 SerializedIndexEntry ready for writing to disk
486 """
487 # Clear out any existing stage bits, then set them from the Stage.
488 new_flags = self.flags & ~FLAG_STAGEMASK
489 new_flags |= stage.value << FLAG_STAGESHIFT
490 return SerializedIndexEntry(
491 name=name,
492 ctime=self.ctime,
493 mtime=self.mtime,
494 dev=self.dev,
495 ino=self.ino,
496 mode=self.mode,
497 uid=self.uid,
498 gid=self.gid,
499 size=self.size,
500 sha=self.sha,
501 flags=new_flags,
502 extended_flags=self.extended_flags,
503 )
505 def stage(self) -> Stage:
506 """Get the merge conflict stage of this entry.
508 Returns:
509 Stage enum value
510 """
511 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
513 @property
514 def skip_worktree(self) -> bool:
515 """Return True if the skip-worktree bit is set in extended_flags."""
516 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
518 def set_skip_worktree(self, skip: bool = True) -> None:
519 """Helper method to set or clear the skip-worktree bit in extended_flags.
521 Also sets FLAG_EXTENDED in self.flags if needed.
522 """
523 if skip:
524 # Turn on the skip-worktree bit
525 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
526 # Also ensure the main 'extended' bit is set in flags
527 self.flags |= FLAG_EXTENDED
528 else:
529 # Turn off the skip-worktree bit
530 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
531 # Optionally unset the main extended bit if no extended flags remain
532 if self.extended_flags == 0:
533 self.flags &= ~FLAG_EXTENDED
536class ConflictedIndexEntry:
537 """Index entry that represents a conflict."""
539 ancestor: Optional[IndexEntry]
540 this: Optional[IndexEntry]
541 other: Optional[IndexEntry]
543 def __init__(
544 self,
545 ancestor: Optional[IndexEntry] = None,
546 this: Optional[IndexEntry] = None,
547 other: Optional[IndexEntry] = None,
548 ) -> None:
549 """Initialize ConflictedIndexEntry.
551 Args:
552 ancestor: The common ancestor entry
553 this: The current branch entry
554 other: The other branch entry
555 """
556 self.ancestor = ancestor
557 self.this = this
558 self.other = other
561class UnmergedEntries(Exception):
562 """Unmerged entries exist in the index."""
565def pathsplit(path: bytes) -> tuple[bytes, bytes]:
566 """Split a /-delimited path into a directory part and a basename.
568 Args:
569 path: The path to split.
571 Returns:
572 Tuple with directory name and basename
573 """
574 try:
575 (dirname, basename) = path.rsplit(b"/", 1)
576 except ValueError:
577 return (b"", path)
578 else:
579 return (dirname, basename)
582def pathjoin(*args: bytes) -> bytes:
583 """Join a /-delimited path."""
584 return b"/".join([p for p in args if p])
587def read_cache_time(f: BinaryIO) -> tuple[int, int]:
588 """Read a cache time.
590 Args:
591 f: File-like object to read from
592 Returns:
593 Tuple with seconds and nanoseconds
594 """
595 return struct.unpack(">LL", f.read(8))
598def write_cache_time(f: IO[bytes], t: Union[int, float, tuple[int, int]]) -> None:
599 """Write a cache time.
601 Args:
602 f: File-like object to write to
603 t: Time to write (as int, float or tuple with secs and nsecs)
604 """
605 if isinstance(t, int):
606 t = (t, 0)
607 elif isinstance(t, float):
608 (secs, nsecs) = divmod(t, 1.0)
609 t = (int(secs), int(nsecs * 1000000000))
610 elif not isinstance(t, tuple):
611 raise TypeError(t)
612 f.write(struct.pack(">LL", *t))
615def read_cache_entry(
616 f: BinaryIO, version: int, previous_path: bytes = b""
617) -> SerializedIndexEntry:
618 """Read an entry from a cache file.
620 Args:
621 f: File-like object to read from
622 version: Index version
623 previous_path: Previous entry's path (for version 4 compression)
624 """
625 beginoffset = f.tell()
626 ctime = read_cache_time(f)
627 mtime = read_cache_time(f)
628 (
629 dev,
630 ino,
631 mode,
632 uid,
633 gid,
634 size,
635 sha,
636 flags,
637 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
638 if flags & FLAG_EXTENDED:
639 if version < 3:
640 raise AssertionError("extended flag set in index with version < 3")
641 (extended_flags,) = struct.unpack(">H", f.read(2))
642 else:
643 extended_flags = 0
645 if version >= 4:
646 # Version 4: paths are always compressed (name_len should be 0)
647 name, _consumed = _decompress_path_from_stream(f, previous_path)
648 else:
649 # Versions < 4: regular name reading
650 name = f.read(flags & FLAG_NAMEMASK)
652 # Padding:
653 if version < 4:
654 real_size = (f.tell() - beginoffset + 8) & ~7
655 f.read((beginoffset + real_size) - f.tell())
657 return SerializedIndexEntry(
658 name,
659 ctime,
660 mtime,
661 dev,
662 ino,
663 mode,
664 uid,
665 gid,
666 size,
667 sha_to_hex(sha),
668 flags & ~FLAG_NAMEMASK,
669 extended_flags,
670 )
673def write_cache_entry(
674 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
675) -> None:
676 """Write an index entry to a file.
678 Args:
679 f: File object
680 entry: IndexEntry to write
681 version: Index format version
682 previous_path: Previous entry's path (for version 4 compression)
683 """
684 beginoffset = f.tell()
685 write_cache_time(f, entry.ctime)
686 write_cache_time(f, entry.mtime)
688 if version >= 4:
689 # Version 4: use compression but set name_len to actual filename length
690 # This matches how C Git implements index v4 flags
691 compressed_path = _compress_path(entry.name, previous_path)
692 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
693 else:
694 # Versions < 4: include actual name length
695 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
697 if entry.extended_flags:
698 flags |= FLAG_EXTENDED
699 if flags & FLAG_EXTENDED and version is not None and version < 3:
700 raise AssertionError("unable to use extended flags in version < 3")
702 f.write(
703 struct.pack(
704 b">LLLLLL20sH",
705 entry.dev & 0xFFFFFFFF,
706 entry.ino & 0xFFFFFFFF,
707 entry.mode,
708 entry.uid,
709 entry.gid,
710 entry.size,
711 hex_to_sha(entry.sha),
712 flags,
713 )
714 )
715 if flags & FLAG_EXTENDED:
716 f.write(struct.pack(b">H", entry.extended_flags))
718 if version >= 4:
719 # Version 4: always write compressed path
720 f.write(compressed_path)
721 else:
722 # Versions < 4: write regular path and padding
723 f.write(entry.name)
724 real_size = (f.tell() - beginoffset + 8) & ~7
725 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
728class UnsupportedIndexFormat(Exception):
729 """An unsupported index format was encountered."""
731 def __init__(self, version: int) -> None:
732 """Initialize UnsupportedIndexFormat exception.
734 Args:
735 version: The unsupported index format version
736 """
737 self.index_format_version = version
740def read_index_header(f: BinaryIO) -> tuple[int, int]:
741 """Read an index header from a file.
743 Returns:
744 tuple of (version, num_entries)
745 """
746 header = f.read(4)
747 if header != b"DIRC":
748 raise AssertionError(f"Invalid index file header: {header!r}")
749 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
750 if version not in (1, 2, 3, 4):
751 raise UnsupportedIndexFormat(version)
752 return version, num_entries
755def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None:
756 """Write an index extension.
758 Args:
759 f: File-like object to write to
760 extension: Extension to write
761 """
762 data = extension.to_bytes()
763 f.write(extension.signature)
764 f.write(struct.pack(">I", len(data)))
765 f.write(data)
768def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
769 """Read an index file, yielding the individual entries."""
770 version, num_entries = read_index_header(f)
771 previous_path = b""
772 for i in range(num_entries):
773 entry = read_cache_entry(f, version, previous_path)
774 previous_path = entry.name
775 yield entry
778def read_index_dict_with_version(
779 f: BinaryIO,
780) -> tuple[
781 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
782]:
783 """Read an index file and return it as a dictionary along with the version.
785 Returns:
786 tuple of (entries_dict, version, extensions)
787 """
788 version, num_entries = read_index_header(f)
790 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
791 previous_path = b""
792 for i in range(num_entries):
793 entry = read_cache_entry(f, version, previous_path)
794 previous_path = entry.name
795 stage = entry.stage()
796 if stage == Stage.NORMAL:
797 ret[entry.name] = IndexEntry.from_serialized(entry)
798 else:
799 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
800 if isinstance(existing, IndexEntry):
801 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
802 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
803 existing.ancestor = IndexEntry.from_serialized(entry)
804 elif stage == Stage.MERGE_CONFLICT_THIS:
805 existing.this = IndexEntry.from_serialized(entry)
806 elif stage == Stage.MERGE_CONFLICT_OTHER:
807 existing.other = IndexEntry.from_serialized(entry)
809 # Read extensions
810 extensions = []
811 while True:
812 # Check if we're at the end (20 bytes before EOF for SHA checksum)
813 current_pos = f.tell()
814 f.seek(0, 2) # EOF
815 eof_pos = f.tell()
816 f.seek(current_pos)
818 if current_pos >= eof_pos - 20:
819 break
821 # Try to read extension signature
822 signature = f.read(4)
823 if len(signature) < 4:
824 break
826 # Check if it's a valid extension signature (4 uppercase letters)
827 if not all(65 <= b <= 90 for b in signature):
828 # Not an extension, seek back
829 f.seek(-4, 1)
830 break
832 # Read extension size
833 size_data = f.read(4)
834 if len(size_data) < 4:
835 break
836 size = struct.unpack(">I", size_data)[0]
838 # Read extension data
839 data = f.read(size)
840 if len(data) < size:
841 break
843 extension = IndexExtension.from_raw(signature, data)
844 extensions.append(extension)
846 return ret, version, extensions
849def read_index_dict(
850 f: BinaryIO,
851) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
852 """Read an index file and return it as a dictionary.
854 Dict Key is tuple of path and stage number, as
855 path alone is not unique
856 Args:
857 f: File object to read fromls.
858 """
859 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
860 for entry in read_index(f):
861 stage = entry.stage()
862 if stage == Stage.NORMAL:
863 ret[entry.name] = IndexEntry.from_serialized(entry)
864 else:
865 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
866 if isinstance(existing, IndexEntry):
867 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
868 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
869 existing.ancestor = IndexEntry.from_serialized(entry)
870 elif stage == Stage.MERGE_CONFLICT_THIS:
871 existing.this = IndexEntry.from_serialized(entry)
872 elif stage == Stage.MERGE_CONFLICT_OTHER:
873 existing.other = IndexEntry.from_serialized(entry)
874 return ret
877def write_index(
878 f: IO[bytes],
879 entries: Sequence[SerializedIndexEntry],
880 version: Optional[int] = None,
881 extensions: Optional[Sequence[IndexExtension]] = None,
882) -> None:
883 """Write an index file.
885 Args:
886 f: File-like object to write to
887 version: Version number to write
888 entries: Iterable over the entries to write
889 extensions: Optional list of extensions to write
890 """
891 if version is None:
892 version = DEFAULT_VERSION
893 # STEP 1: check if any extended_flags are set
894 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
895 if uses_extended_flags and version < 3:
896 # Force or bump the version to 3
897 version = 3
898 # The rest is unchanged, but you might insert a final check:
899 if version < 3:
900 # Double-check no extended flags appear
901 for e in entries:
902 if e.extended_flags != 0:
903 raise AssertionError("Attempt to use extended flags in index < v3")
904 # Proceed with the existing code to write the header and entries.
905 f.write(b"DIRC")
906 f.write(struct.pack(b">LL", version, len(entries)))
907 previous_path = b""
908 for entry in entries:
909 write_cache_entry(f, entry, version=version, previous_path=previous_path)
910 previous_path = entry.name
912 # Write extensions
913 if extensions:
914 for extension in extensions:
915 write_index_extension(f, extension)
918def write_index_dict(
919 f: IO[bytes],
920 entries: Mapping[bytes, Union[IndexEntry, ConflictedIndexEntry]],
921 version: Optional[int] = None,
922 extensions: Optional[Sequence[IndexExtension]] = None,
923) -> None:
924 """Write an index file based on the contents of a dictionary.
926 being careful to sort by path and then by stage.
927 """
928 entries_list = []
929 for key in sorted(entries):
930 value = entries[key]
931 if isinstance(value, ConflictedIndexEntry):
932 if value.ancestor is not None:
933 entries_list.append(
934 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
935 )
936 if value.this is not None:
937 entries_list.append(
938 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
939 )
940 if value.other is not None:
941 entries_list.append(
942 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
943 )
944 else:
945 entries_list.append(value.serialize(key, Stage.NORMAL))
947 write_index(f, entries_list, version=version, extensions=extensions)
950def cleanup_mode(mode: int) -> int:
951 """Cleanup a mode value.
953 This will return a mode that can be stored in a tree object.
955 Args:
956 mode: Mode to clean up.
958 Returns:
959 mode
960 """
961 if stat.S_ISLNK(mode):
962 return stat.S_IFLNK
963 elif stat.S_ISDIR(mode):
964 return stat.S_IFDIR
965 elif S_ISGITLINK(mode):
966 return S_IFGITLINK
967 ret = stat.S_IFREG | 0o644
968 if mode & 0o100:
969 ret |= 0o111
970 return ret
973class Index:
974 """A Git Index file."""
976 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
978 def __init__(
979 self,
980 filename: Union[bytes, str, os.PathLike[str]],
981 read: bool = True,
982 skip_hash: bool = False,
983 version: Optional[int] = None,
984 ) -> None:
985 """Create an index object associated with the given filename.
987 Args:
988 filename: Path to the index file
989 read: Whether to initialize the index from the given file, should it exist.
990 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
991 version: Index format version to use (None = auto-detect from file or use default)
992 """
993 self._filename = os.fspath(filename)
994 # TODO(jelmer): Store the version returned by read_index
995 self._version = version
996 self._skip_hash = skip_hash
997 self._extensions: list[IndexExtension] = []
998 self.clear()
999 if read:
1000 self.read()
1002 @property
1003 def path(self) -> Union[bytes, str]:
1004 """Get the path to the index file.
1006 Returns:
1007 Path to the index file
1008 """
1009 return self._filename
1011 def __repr__(self) -> str:
1012 """Return string representation of Index."""
1013 return f"{self.__class__.__name__}({self._filename!r})"
1015 def write(self) -> None:
1016 """Write current contents of index to disk."""
1017 f = GitFile(self._filename, "wb")
1018 try:
1019 # Filter out extensions with no meaningful data
1020 meaningful_extensions = []
1021 for ext in self._extensions:
1022 # Skip extensions that have empty data
1023 ext_data = ext.to_bytes()
1024 if ext_data:
1025 meaningful_extensions.append(ext)
1027 if self._skip_hash:
1028 # When skipHash is enabled, write the index without computing SHA1
1029 write_index_dict(
1030 f,
1031 self._byname,
1032 version=self._version,
1033 extensions=meaningful_extensions,
1034 )
1035 # Write 20 zero bytes instead of SHA1
1036 f.write(b"\x00" * 20)
1037 f.close()
1038 else:
1039 sha1_writer = SHA1Writer(f)
1040 write_index_dict(
1041 sha1_writer,
1042 self._byname,
1043 version=self._version,
1044 extensions=meaningful_extensions,
1045 )
1046 sha1_writer.close()
1047 except:
1048 f.close()
1049 raise
1051 def read(self) -> None:
1052 """Read current contents of index from disk."""
1053 if not os.path.exists(self._filename):
1054 return
1055 f = GitFile(self._filename, "rb")
1056 try:
1057 sha1_reader = SHA1Reader(f)
1058 entries, version, extensions = read_index_dict_with_version(sha1_reader)
1059 self._version = version
1060 self._extensions = extensions
1061 self.update(entries)
1062 # Extensions have already been read by read_index_dict_with_version
1063 sha1_reader.check_sha(allow_empty=True)
1064 finally:
1065 f.close()
1067 def __len__(self) -> int:
1068 """Number of entries in this index file."""
1069 return len(self._byname)
1071 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
1072 """Retrieve entry by relative path and stage.
1074 Returns: Either a IndexEntry or a ConflictedIndexEntry
1075 Raises KeyError: if the entry does not exist
1076 """
1077 return self._byname[key]
1079 def __iter__(self) -> Iterator[bytes]:
1080 """Iterate over the paths and stages in this index."""
1081 return iter(self._byname)
1083 def __contains__(self, key: bytes) -> bool:
1084 """Check if a path exists in the index."""
1085 return key in self._byname
1087 def get_sha1(self, path: bytes) -> bytes:
1088 """Return the (git object) SHA1 for the object at a path."""
1089 value = self[path]
1090 if isinstance(value, ConflictedIndexEntry):
1091 raise UnmergedEntries
1092 return value.sha
1094 def get_mode(self, path: bytes) -> int:
1095 """Return the POSIX file mode for the object at a path."""
1096 value = self[path]
1097 if isinstance(value, ConflictedIndexEntry):
1098 raise UnmergedEntries
1099 return value.mode
1101 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
1102 """Iterate over path, sha, mode tuples for use with commit_tree."""
1103 for path in self:
1104 entry = self[path]
1105 if isinstance(entry, ConflictedIndexEntry):
1106 raise UnmergedEntries
1107 yield path, entry.sha, cleanup_mode(entry.mode)
1109 def has_conflicts(self) -> bool:
1110 """Check if the index contains any conflicted entries.
1112 Returns:
1113 True if any entries are conflicted, False otherwise
1114 """
1115 for value in self._byname.values():
1116 if isinstance(value, ConflictedIndexEntry):
1117 return True
1118 return False
1120 def clear(self) -> None:
1121 """Remove all contents from this index."""
1122 self._byname = {}
1124 def __setitem__(
1125 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
1126 ) -> None:
1127 """Set an entry in the index."""
1128 assert isinstance(name, bytes)
1129 self._byname[name] = value
1131 def __delitem__(self, name: bytes) -> None:
1132 """Delete an entry from the index."""
1133 del self._byname[name]
1135 def iteritems(
1136 self,
1137 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1138 """Iterate over (path, entry) pairs in the index.
1140 Returns:
1141 Iterator of (path, entry) tuples
1142 """
1143 return iter(self._byname.items())
1145 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1146 """Get an iterator over (path, entry) pairs.
1148 Returns:
1149 Iterator of (path, entry) tuples
1150 """
1151 return iter(self._byname.items())
1153 def update(
1154 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
1155 ) -> None:
1156 """Update the index with multiple entries.
1158 Args:
1159 entries: Dictionary mapping paths to index entries
1160 """
1161 for key, value in entries.items():
1162 self[key] = value
1164 def paths(self) -> Generator[bytes, None, None]:
1165 """Generate all paths in the index.
1167 Yields:
1168 Path names as bytes
1169 """
1170 yield from self._byname.keys()
1172 def changes_from_tree(
1173 self,
1174 object_store: ObjectContainer,
1175 tree: ObjectID,
1176 want_unchanged: bool = False,
1177 ) -> Generator[
1178 tuple[
1179 tuple[Optional[bytes], Optional[bytes]],
1180 tuple[Optional[int], Optional[int]],
1181 tuple[Optional[bytes], Optional[bytes]],
1182 ],
1183 None,
1184 None,
1185 ]:
1186 """Find the differences between the contents of this index and a tree.
1188 Args:
1189 object_store: Object store to use for retrieving tree contents
1190 tree: SHA1 of the root tree
1191 want_unchanged: Whether unchanged files should be reported
1192 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1193 newmode), (oldsha, newsha)
1194 """
1196 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1197 entry = self[path]
1198 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1199 return entry.sha, cleanup_mode(entry.mode)
1200 else:
1201 # Handle ConflictedIndexEntry case
1202 return b"", 0
1204 yield from changes_from_tree(
1205 self.paths(),
1206 lookup_entry,
1207 object_store,
1208 tree,
1209 want_unchanged=want_unchanged,
1210 )
1212 def commit(self, object_store: ObjectContainer) -> bytes:
1213 """Create a new tree from an index.
1215 Args:
1216 object_store: Object store to save the tree in
1217 Returns:
1218 Root tree SHA
1219 """
1220 return commit_tree(object_store, self.iterobjects())
1223def commit_tree(
1224 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1225) -> bytes:
1226 """Commit a new tree.
1228 Args:
1229 object_store: Object store to add trees to
1230 blobs: Iterable over blob path, sha, mode entries
1231 Returns:
1232 SHA1 of the created tree.
1233 """
1234 trees: dict[bytes, TreeDict] = {b"": {}}
1236 def add_tree(path: bytes) -> TreeDict:
1237 if path in trees:
1238 return trees[path]
1239 dirname, basename = pathsplit(path)
1240 t = add_tree(dirname)
1241 assert isinstance(basename, bytes)
1242 newtree: TreeDict = {}
1243 t[basename] = newtree
1244 trees[path] = newtree
1245 return newtree
1247 for path, sha, mode in blobs:
1248 tree_path, basename = pathsplit(path)
1249 tree = add_tree(tree_path)
1250 tree[basename] = (mode, sha)
1252 def build_tree(path: bytes) -> bytes:
1253 tree = Tree()
1254 for basename, entry in trees[path].items():
1255 if isinstance(entry, dict):
1256 mode = stat.S_IFDIR
1257 sha = build_tree(pathjoin(path, basename))
1258 else:
1259 (mode, sha) = entry
1260 tree.add(basename, mode, sha)
1261 object_store.add_object(tree)
1262 return tree.id
1264 return build_tree(b"")
1267def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1268 """Create a new tree from an index.
1270 Args:
1271 object_store: Object store to save the tree in
1272 index: Index file
1273 Note: This function is deprecated, use index.commit() instead.
1274 Returns: Root tree sha.
1275 """
1276 return commit_tree(object_store, index.iterobjects())
1279def changes_from_tree(
1280 names: Iterable[bytes],
1281 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1282 object_store: ObjectContainer,
1283 tree: Optional[bytes],
1284 want_unchanged: bool = False,
1285) -> Iterable[
1286 tuple[
1287 tuple[Optional[bytes], Optional[bytes]],
1288 tuple[Optional[int], Optional[int]],
1289 tuple[Optional[bytes], Optional[bytes]],
1290 ]
1291]:
1292 """Find the differences between the contents of a tree and a working copy.
1294 Args:
1295 names: Iterable of names in the working copy
1296 lookup_entry: Function to lookup an entry in the working copy
1297 object_store: Object store to use for retrieving tree contents
1298 tree: SHA1 of the root tree, or None for an empty tree
1299 want_unchanged: Whether unchanged files should be reported
1300 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1301 (oldsha, newsha)
1302 """
1303 # TODO(jelmer): Support a include_trees option
1304 other_names = set(names)
1306 if tree is not None:
1307 for name, mode, sha in iter_tree_contents(object_store, tree):
1308 assert name is not None and mode is not None and sha is not None
1309 try:
1310 (other_sha, other_mode) = lookup_entry(name)
1311 except KeyError:
1312 # Was removed
1313 yield ((name, None), (mode, None), (sha, None))
1314 else:
1315 other_names.remove(name)
1316 if want_unchanged or other_sha != sha or other_mode != mode:
1317 yield ((name, name), (mode, other_mode), (sha, other_sha))
1319 # Mention added files
1320 for name in other_names:
1321 try:
1322 (other_sha, other_mode) = lookup_entry(name)
1323 except KeyError:
1324 pass
1325 else:
1326 yield ((None, name), (None, other_mode), (None, other_sha))
1329def index_entry_from_stat(
1330 stat_val: os.stat_result,
1331 hex_sha: bytes,
1332 mode: Optional[int] = None,
1333) -> IndexEntry:
1334 """Create a new index entry from a stat value.
1336 Args:
1337 stat_val: POSIX stat_result instance
1338 hex_sha: Hex sha of the object
1339 mode: Optional file mode, will be derived from stat if not provided
1340 """
1341 if mode is None:
1342 mode = cleanup_mode(stat_val.st_mode)
1344 return IndexEntry(
1345 ctime=stat_val.st_ctime,
1346 mtime=stat_val.st_mtime,
1347 dev=stat_val.st_dev,
1348 ino=stat_val.st_ino,
1349 mode=mode,
1350 uid=stat_val.st_uid,
1351 gid=stat_val.st_gid,
1352 size=stat_val.st_size,
1353 sha=hex_sha,
1354 flags=0,
1355 extended_flags=0,
1356 )
1359if sys.platform == "win32":
1360 # On Windows, creating symlinks either requires administrator privileges
1361 # or developer mode. Raise a more helpful error when we're unable to
1362 # create symlinks
1364 # https://github.com/jelmer/dulwich/issues/1005
1366 class WindowsSymlinkPermissionError(PermissionError):
1367 """Windows-specific error for symlink creation failures.
1369 This error is raised when symlink creation fails on Windows,
1370 typically due to lack of developer mode or administrator privileges.
1371 """
1373 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
1374 """Initialize WindowsSymlinkPermissionError."""
1375 super(PermissionError, self).__init__(
1376 errno,
1377 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1378 filename,
1379 )
1381 def symlink(
1382 src: Union[str, bytes],
1383 dst: Union[str, bytes],
1384 target_is_directory: bool = False,
1385 *,
1386 dir_fd: Optional[int] = None,
1387 ) -> None:
1388 """Create a symbolic link on Windows with better error handling.
1390 Args:
1391 src: Source path for the symlink
1392 dst: Destination path where symlink will be created
1393 target_is_directory: Whether the target is a directory
1394 dir_fd: Optional directory file descriptor
1396 Raises:
1397 WindowsSymlinkPermissionError: If symlink creation fails due to permissions
1398 """
1399 try:
1400 return os.symlink(
1401 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1402 )
1403 except PermissionError as e:
1404 raise WindowsSymlinkPermissionError(
1405 e.errno or 0, e.strerror or "", e.filename
1406 ) from e
1407else:
1408 symlink = os.symlink
1411def build_file_from_blob(
1412 blob: Blob,
1413 mode: int,
1414 target_path: bytes,
1415 *,
1416 honor_filemode: bool = True,
1417 tree_encoding: str = "utf-8",
1418 symlink_fn: Optional[
1419 Callable[
1420 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
1421 None,
1422 ]
1423 ] = None,
1424) -> os.stat_result:
1425 """Build a file or symlink on disk based on a Git object.
1427 Args:
1428 blob: The git object
1429 mode: File mode
1430 target_path: Path to write to
1431 honor_filemode: An optional flag to honor core.filemode setting in
1432 config file, default is core.filemode=True, change executable bit
1433 tree_encoding: Encoding to use for tree contents
1434 symlink_fn: Function to use for creating symlinks
1435 Returns: stat object for the file
1436 """
1437 try:
1438 oldstat = os.lstat(target_path)
1439 except FileNotFoundError:
1440 oldstat = None
1441 contents = blob.as_raw_string()
1442 if stat.S_ISLNK(mode):
1443 if oldstat:
1444 _remove_file_with_readonly_handling(target_path)
1445 if sys.platform == "win32":
1446 # os.readlink on Python3 on Windows requires a unicode string.
1447 contents_str = contents.decode(tree_encoding)
1448 target_path_str = target_path.decode(tree_encoding)
1449 (symlink_fn or symlink)(contents_str, target_path_str)
1450 else:
1451 (symlink_fn or symlink)(contents, target_path)
1452 else:
1453 if oldstat is not None and oldstat.st_size == len(contents):
1454 with open(target_path, "rb") as f:
1455 if f.read() == contents:
1456 return oldstat
1458 with open(target_path, "wb") as f:
1459 # Write out file
1460 f.write(contents)
1462 if honor_filemode:
1463 os.chmod(target_path, mode)
1465 return os.lstat(target_path)
1468INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1471def _normalize_path_element_default(element: bytes) -> bytes:
1472 """Normalize path element for default case-insensitive comparison."""
1473 return element.lower()
1476def _normalize_path_element_ntfs(element: bytes) -> bytes:
1477 """Normalize path element for NTFS filesystem."""
1478 return element.rstrip(b". ").lower()
1481def _normalize_path_element_hfs(element: bytes) -> bytes:
1482 """Normalize path element for HFS+ filesystem."""
1483 import unicodedata
1485 # Decode to Unicode (let UnicodeDecodeError bubble up)
1486 element_str = element.decode("utf-8", errors="strict")
1488 # Remove HFS+ ignorable characters
1489 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1490 # Normalize to NFD
1491 normalized = unicodedata.normalize("NFD", filtered)
1492 return normalized.lower().encode("utf-8", errors="strict")
1495def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
1496 """Get the appropriate path element normalization function based on config.
1498 Args:
1499 config: Repository configuration object
1501 Returns:
1502 Function that normalizes path elements for the configured filesystem
1503 """
1504 import os
1505 import sys
1507 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1508 return _normalize_path_element_ntfs
1509 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1510 return _normalize_path_element_hfs
1511 else:
1512 return _normalize_path_element_default
1515def validate_path_element_default(element: bytes) -> bool:
1516 """Validate a path element using default rules.
1518 Args:
1519 element: Path element to validate
1521 Returns:
1522 True if path element is valid, False otherwise
1523 """
1524 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1527def validate_path_element_ntfs(element: bytes) -> bool:
1528 """Validate a path element using NTFS filesystem rules.
1530 Args:
1531 element: Path element to validate
1533 Returns:
1534 True if path element is valid for NTFS, False otherwise
1535 """
1536 normalized = _normalize_path_element_ntfs(element)
1537 if normalized in INVALID_DOTNAMES:
1538 return False
1539 if normalized == b"git~1":
1540 return False
1541 return True
1544# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1545HFS_IGNORABLE_CHARS = {
1546 0x200C, # ZERO WIDTH NON-JOINER
1547 0x200D, # ZERO WIDTH JOINER
1548 0x200E, # LEFT-TO-RIGHT MARK
1549 0x200F, # RIGHT-TO-LEFT MARK
1550 0x202A, # LEFT-TO-RIGHT EMBEDDING
1551 0x202B, # RIGHT-TO-LEFT EMBEDDING
1552 0x202C, # POP DIRECTIONAL FORMATTING
1553 0x202D, # LEFT-TO-RIGHT OVERRIDE
1554 0x202E, # RIGHT-TO-LEFT OVERRIDE
1555 0x206A, # INHIBIT SYMMETRIC SWAPPING
1556 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1557 0x206C, # INHIBIT ARABIC FORM SHAPING
1558 0x206D, # ACTIVATE ARABIC FORM SHAPING
1559 0x206E, # NATIONAL DIGIT SHAPES
1560 0x206F, # NOMINAL DIGIT SHAPES
1561 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1562}
1565def validate_path_element_hfs(element: bytes) -> bool:
1566 """Validate path element for HFS+ filesystem.
1568 Equivalent to Git's is_hfs_dotgit and related checks.
1569 Uses NFD normalization and ignores HFS+ ignorable characters.
1570 """
1571 try:
1572 normalized = _normalize_path_element_hfs(element)
1573 except UnicodeDecodeError:
1574 # Malformed UTF-8 - be conservative and reject
1575 return False
1577 # Check against invalid names
1578 if normalized in INVALID_DOTNAMES:
1579 return False
1581 # Also check for 8.3 short name
1582 if normalized == b"git~1":
1583 return False
1585 return True
1588def validate_path(
1589 path: bytes,
1590 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1591) -> bool:
1592 """Default path validator that just checks for .git/."""
1593 parts = path.split(b"/")
1594 for p in parts:
1595 if not element_validator(p):
1596 return False
1597 else:
1598 return True
1601def build_index_from_tree(
1602 root_path: Union[str, bytes],
1603 index_path: Union[str, bytes],
1604 object_store: ObjectContainer,
1605 tree_id: bytes,
1606 honor_filemode: bool = True,
1607 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1608 symlink_fn: Optional[
1609 Callable[
1610 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
1611 None,
1612 ]
1613 ] = None,
1614 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
1615 tree_encoding: str = "utf-8",
1616) -> None:
1617 """Generate and materialize index from a tree.
1619 Args:
1620 tree_id: Tree to materialize
1621 root_path: Target dir for materialized index files
1622 index_path: Target path for generated index
1623 object_store: Non-empty object store holding tree contents
1624 honor_filemode: An optional flag to honor core.filemode setting in
1625 config file, default is core.filemode=True, change executable bit
1626 validate_path_element: Function to validate path elements to check
1627 out; default just refuses .git and .. directories.
1628 symlink_fn: Function to use for creating symlinks
1629 blob_normalizer: An optional BlobNormalizer to use for converting line
1630 endings when writing blobs to the working directory.
1631 tree_encoding: Encoding used for tree paths (default: utf-8)
1633 Note: existing index is wiped and contents are not merged
1634 in a working dir. Suitable only for fresh clones.
1635 """
1636 index = Index(index_path, read=False)
1637 if not isinstance(root_path, bytes):
1638 root_path = os.fsencode(root_path)
1640 for entry in iter_tree_contents(object_store, tree_id):
1641 assert (
1642 entry.path is not None and entry.mode is not None and entry.sha is not None
1643 )
1644 if not validate_path(entry.path, validate_path_element):
1645 continue
1646 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
1648 if not os.path.exists(os.path.dirname(full_path)):
1649 os.makedirs(os.path.dirname(full_path))
1651 # TODO(jelmer): Merge new index into working tree
1652 if S_ISGITLINK(entry.mode):
1653 if not os.path.isdir(full_path):
1654 os.mkdir(full_path)
1655 st = os.lstat(full_path)
1656 # TODO(jelmer): record and return submodule paths
1657 else:
1658 obj = object_store[entry.sha]
1659 assert isinstance(obj, Blob)
1660 # Apply blob normalization for checkout if normalizer is provided
1661 if blob_normalizer is not None:
1662 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1663 st = build_file_from_blob(
1664 obj,
1665 entry.mode,
1666 full_path,
1667 honor_filemode=honor_filemode,
1668 tree_encoding=tree_encoding,
1669 symlink_fn=symlink_fn,
1670 )
1672 # Add file to index
1673 if not honor_filemode or S_ISGITLINK(entry.mode):
1674 # we can not use tuple slicing to build a new tuple,
1675 # because on windows that will convert the times to
1676 # longs, which causes errors further along
1677 st_tuple = (
1678 entry.mode,
1679 st.st_ino,
1680 st.st_dev,
1681 st.st_nlink,
1682 st.st_uid,
1683 st.st_gid,
1684 st.st_size,
1685 st.st_atime,
1686 st.st_mtime,
1687 st.st_ctime,
1688 )
1689 st = st.__class__(st_tuple)
1690 # default to a stage 0 index entry (normal)
1691 # when reading from the filesystem
1692 index[entry.path] = index_entry_from_stat(st, entry.sha)
1694 index.write()
1697def blob_from_path_and_mode(
1698 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1699) -> Blob:
1700 """Create a blob from a path and a stat object.
1702 Args:
1703 fs_path: Full file system path to file
1704 mode: File mode
1705 tree_encoding: Encoding to use for tree contents
1706 Returns: A `Blob` object
1707 """
1708 assert isinstance(fs_path, bytes)
1709 blob = Blob()
1710 if stat.S_ISLNK(mode):
1711 if sys.platform == "win32":
1712 # os.readlink on Python3 on Windows requires a unicode string.
1713 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1714 else:
1715 blob.data = os.readlink(fs_path)
1716 else:
1717 with open(fs_path, "rb") as f:
1718 blob.data = f.read()
1719 return blob
1722def blob_from_path_and_stat(
1723 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
1724) -> Blob:
1725 """Create a blob from a path and a stat object.
1727 Args:
1728 fs_path: Full file system path to file
1729 st: A stat object
1730 tree_encoding: Encoding to use for tree contents
1731 Returns: A `Blob` object
1732 """
1733 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
1736def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
1737 """Read the head commit of a submodule.
1739 Args:
1740 path: path to the submodule
1741 Returns: HEAD sha, None if not a valid head/repository
1742 """
1743 from .errors import NotGitRepository
1744 from .repo import Repo
1746 # Repo currently expects a "str", so decode if necessary.
1747 # TODO(jelmer): Perhaps move this into Repo() ?
1748 if not isinstance(path, str):
1749 path = os.fsdecode(path)
1750 try:
1751 repo = Repo(path)
1752 except NotGitRepository:
1753 return None
1754 try:
1755 return repo.head()
1756 except KeyError:
1757 return None
1760def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
1761 """Check if a directory has changed after getting an error.
1763 When handling an error trying to create a blob from a path, call this
1764 function. It will check if the path is a directory. If it's a directory
1765 and a submodule, check the submodule head to see if it's has changed. If
1766 not, consider the file as changed as Git tracked a file and not a
1767 directory.
1769 Return true if the given path should be considered as changed and False
1770 otherwise or if the path is not a directory.
1771 """
1772 # This is actually a directory
1773 if os.path.exists(os.path.join(tree_path, b".git")):
1774 # Submodule
1775 head = read_submodule_head(tree_path)
1776 if entry.sha != head:
1777 return True
1778 else:
1779 # The file was changed to a directory, so consider it removed.
1780 return True
1782 return False
1785os_sep_bytes = os.sep.encode("ascii")
1788def _ensure_parent_dir_exists(full_path: bytes) -> None:
1789 """Ensure parent directory exists, checking no parent is a file."""
1790 parent_dir = os.path.dirname(full_path)
1791 if parent_dir and not os.path.exists(parent_dir):
1792 # Walk up the directory tree to find the first existing parent
1793 current = parent_dir
1794 parents_to_check: list[bytes] = []
1796 while current and not os.path.exists(current):
1797 parents_to_check.insert(0, current)
1798 new_parent = os.path.dirname(current)
1799 if new_parent == current:
1800 # Reached the root or can't go up further
1801 break
1802 current = new_parent
1804 # Check if the existing parent (if any) is a directory
1805 if current and os.path.exists(current) and not os.path.isdir(current):
1806 raise OSError(
1807 f"Cannot create directory, parent path is a file: {current!r}"
1808 )
1810 # Now check each parent we need to create isn't blocked by an existing file
1811 for parent_path in parents_to_check:
1812 if os.path.exists(parent_path) and not os.path.isdir(parent_path):
1813 raise OSError(
1814 f"Cannot create directory, parent path is a file: {parent_path!r}"
1815 )
1817 os.makedirs(parent_dir)
1820def _remove_file_with_readonly_handling(path: bytes) -> None:
1821 """Remove a file, handling read-only files on Windows.
1823 Args:
1824 path: Path to the file to remove
1825 """
1826 try:
1827 os.unlink(path)
1828 except PermissionError:
1829 # On Windows, remove read-only attribute and retry
1830 if sys.platform == "win32":
1831 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
1832 os.unlink(path)
1833 else:
1834 raise
1837def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
1838 """Remove empty parent directories up to stop_at."""
1839 parent = os.path.dirname(path)
1840 while parent and parent != stop_at:
1841 try:
1842 os.rmdir(parent)
1843 parent = os.path.dirname(parent)
1844 except FileNotFoundError:
1845 # Directory doesn't exist - stop trying
1846 break
1847 except OSError as e:
1848 if e.errno == errno.ENOTEMPTY:
1849 # Directory not empty - stop trying
1850 break
1851 raise
1854def _check_symlink_matches(
1855 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes
1856) -> bool:
1857 """Check if symlink target matches expected target.
1859 Returns True if symlink matches, False if it doesn't match.
1860 """
1861 try:
1862 current_target = os.readlink(full_path)
1863 blob_obj = repo_object_store[entry_sha]
1864 expected_target = blob_obj.as_raw_string()
1865 if isinstance(current_target, str):
1866 current_target = current_target.encode()
1867 return current_target == expected_target
1868 except FileNotFoundError:
1869 # Symlink doesn't exist
1870 return False
1871 except OSError as e:
1872 if e.errno == errno.EINVAL:
1873 # Not a symlink
1874 return False
1875 raise
1878def _check_file_matches(
1879 repo_object_store: "BaseObjectStore",
1880 full_path: bytes,
1881 entry_sha: bytes,
1882 entry_mode: int,
1883 current_stat: os.stat_result,
1884 honor_filemode: bool,
1885 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
1886 tree_path: Optional[bytes] = None,
1887) -> bool:
1888 """Check if a file on disk matches the expected git object.
1890 Returns True if file matches, False if it doesn't match.
1891 """
1892 # Check mode first (if honor_filemode is True)
1893 if honor_filemode:
1894 current_mode = stat.S_IMODE(current_stat.st_mode)
1895 expected_mode = stat.S_IMODE(entry_mode)
1897 # For regular files, only check the user executable bit, not group/other permissions
1898 # This matches Git's behavior where umask differences don't count as modifications
1899 if stat.S_ISREG(current_stat.st_mode):
1900 # Normalize regular file modes to ignore group/other write permissions
1901 current_mode_normalized = (
1902 current_mode & 0o755
1903 ) # Keep only user rwx and all read+execute
1904 expected_mode_normalized = expected_mode & 0o755
1906 # For Git compatibility, regular files should be either 644 or 755
1907 if expected_mode_normalized not in (0o644, 0o755):
1908 expected_mode_normalized = 0o644 # Default for regular files
1909 if current_mode_normalized not in (0o644, 0o755):
1910 # Determine if it should be executable based on user execute bit
1911 if current_mode & 0o100: # User execute bit is set
1912 current_mode_normalized = 0o755
1913 else:
1914 current_mode_normalized = 0o644
1916 if current_mode_normalized != expected_mode_normalized:
1917 return False
1918 else:
1919 # For non-regular files (symlinks, etc.), check mode exactly
1920 if current_mode != expected_mode:
1921 return False
1923 # If mode matches (or we don't care), check content via size first
1924 blob_obj = repo_object_store[entry_sha]
1925 if current_stat.st_size != blob_obj.raw_length():
1926 return False
1928 # Size matches, check actual content
1929 try:
1930 with open(full_path, "rb") as f:
1931 current_content = f.read()
1932 expected_content = blob_obj.as_raw_string()
1933 if blob_normalizer and tree_path is not None:
1934 assert isinstance(blob_obj, Blob)
1935 normalized_blob = blob_normalizer.checkout_normalize(
1936 blob_obj, tree_path
1937 )
1938 expected_content = normalized_blob.as_raw_string()
1939 return current_content == expected_content
1940 except (FileNotFoundError, PermissionError, IsADirectoryError):
1941 return False
1944def _transition_to_submodule(
1945 repo: "Repo",
1946 path: bytes,
1947 full_path: bytes,
1948 current_stat: Optional[os.stat_result],
1949 entry: Union[IndexEntry, TreeEntry],
1950 index: Index,
1951) -> None:
1952 """Transition any type to submodule."""
1953 from .submodule import ensure_submodule_placeholder
1955 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1956 # Already a directory, just ensure .git file exists
1957 ensure_submodule_placeholder(repo, path)
1958 else:
1959 # Remove whatever is there and create submodule
1960 if current_stat is not None:
1961 _remove_file_with_readonly_handling(full_path)
1962 ensure_submodule_placeholder(repo, path)
1964 st = os.lstat(full_path)
1965 assert entry.sha is not None
1966 index[path] = index_entry_from_stat(st, entry.sha)
1969def _transition_to_file(
1970 object_store: "BaseObjectStore",
1971 path: bytes,
1972 full_path: bytes,
1973 current_stat: Optional[os.stat_result],
1974 entry: Union[IndexEntry, TreeEntry],
1975 index: Index,
1976 honor_filemode: bool,
1977 symlink_fn: Optional[
1978 Callable[
1979 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
1980 None,
1981 ]
1982 ],
1983 blob_normalizer: Optional["FilterBlobNormalizer"],
1984 tree_encoding: str = "utf-8",
1985) -> None:
1986 """Transition any type to regular file or symlink."""
1987 assert entry.sha is not None and entry.mode is not None
1988 # Check if we need to update
1989 if (
1990 current_stat is not None
1991 and stat.S_ISREG(current_stat.st_mode)
1992 and not stat.S_ISLNK(entry.mode)
1993 ):
1994 # File to file - check if update needed
1995 file_matches = _check_file_matches(
1996 object_store,
1997 full_path,
1998 entry.sha,
1999 entry.mode,
2000 current_stat,
2001 honor_filemode,
2002 blob_normalizer,
2003 path,
2004 )
2005 needs_update = not file_matches
2006 elif (
2007 current_stat is not None
2008 and stat.S_ISLNK(current_stat.st_mode)
2009 and stat.S_ISLNK(entry.mode)
2010 ):
2011 # Symlink to symlink - check if update needed
2012 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
2013 needs_update = not symlink_matches
2014 else:
2015 needs_update = True
2017 if not needs_update:
2018 # Just update index - current_stat should always be valid here since we're not updating
2019 assert current_stat is not None
2020 index[path] = index_entry_from_stat(current_stat, entry.sha)
2021 return
2023 # Remove existing entry if needed
2024 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2025 # Remove directory
2026 dir_contents = set(os.listdir(full_path))
2027 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2029 if git_file_name in dir_contents:
2030 if dir_contents != {git_file_name}:
2031 raise IsADirectoryError(
2032 f"Cannot replace submodule with untracked files: {full_path!r}"
2033 )
2034 shutil.rmtree(full_path)
2035 else:
2036 try:
2037 os.rmdir(full_path)
2038 except OSError as e:
2039 if e.errno == errno.ENOTEMPTY:
2040 raise IsADirectoryError(
2041 f"Cannot replace non-empty directory with file: {full_path!r}"
2042 )
2043 raise
2044 elif current_stat is not None:
2045 _remove_file_with_readonly_handling(full_path)
2047 # Ensure parent directory exists
2048 _ensure_parent_dir_exists(full_path)
2050 # Write the file
2051 blob_obj = object_store[entry.sha]
2052 assert isinstance(blob_obj, Blob)
2053 if blob_normalizer:
2054 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
2055 st = build_file_from_blob(
2056 blob_obj,
2057 entry.mode,
2058 full_path,
2059 honor_filemode=honor_filemode,
2060 tree_encoding=tree_encoding,
2061 symlink_fn=symlink_fn,
2062 )
2063 index[path] = index_entry_from_stat(st, entry.sha)
2066def _transition_to_absent(
2067 repo: "Repo",
2068 path: bytes,
2069 full_path: bytes,
2070 current_stat: Optional[os.stat_result],
2071 index: Index,
2072) -> None:
2073 """Remove any type of entry."""
2074 if current_stat is None:
2075 return
2077 if stat.S_ISDIR(current_stat.st_mode):
2078 # Check if it's a submodule directory
2079 dir_contents = set(os.listdir(full_path))
2080 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2082 if git_file_name in dir_contents and dir_contents == {git_file_name}:
2083 shutil.rmtree(full_path)
2084 else:
2085 try:
2086 os.rmdir(full_path)
2087 except OSError as e:
2088 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
2089 raise
2090 else:
2091 _remove_file_with_readonly_handling(full_path)
2093 try:
2094 del index[path]
2095 except KeyError:
2096 pass
2098 # Try to remove empty parent directories
2099 _remove_empty_parents(
2100 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2101 )
2104def detect_case_only_renames(
2105 changes: Sequence["TreeChange"],
2106 config: "Config",
2107) -> list["TreeChange"]:
2108 """Detect and transform case-only renames in a list of tree changes.
2110 This function identifies file renames that only differ in case (e.g.,
2111 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
2112 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
2113 based on the repository configuration.
2115 Args:
2116 changes: List of TreeChange objects representing file changes
2117 config: Repository configuration object
2119 Returns:
2120 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
2121 """
2122 from .diff_tree import (
2123 CHANGE_ADD,
2124 CHANGE_COPY,
2125 CHANGE_DELETE,
2126 CHANGE_MODIFY,
2127 CHANGE_RENAME,
2128 TreeChange,
2129 )
2131 # Build dictionaries of old and new paths with their normalized forms
2132 old_paths_normalized = {}
2133 new_paths_normalized = {}
2134 old_changes = {} # Map from old path to change object
2135 new_changes = {} # Map from new path to change object
2137 # Get the appropriate normalizer based on config
2138 normalize_func = get_path_element_normalizer(config)
2140 def normalize_path(path: bytes) -> bytes:
2141 """Normalize entire path using element normalization."""
2142 return b"/".join(normalize_func(part) for part in path.split(b"/"))
2144 # Pre-normalize all paths once to avoid repeated normalization
2145 for change in changes:
2146 if change.type == CHANGE_DELETE and change.old:
2147 assert change.old.path is not None
2148 try:
2149 normalized = normalize_path(change.old.path)
2150 except UnicodeDecodeError:
2151 import logging
2153 logging.warning(
2154 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2155 change.old.path,
2156 )
2157 else:
2158 old_paths_normalized[normalized] = change.old.path
2159 old_changes[change.old.path] = change
2160 elif change.type == CHANGE_RENAME and change.old:
2161 assert change.old.path is not None
2162 # Treat RENAME as DELETE + ADD for case-only detection
2163 try:
2164 normalized = normalize_path(change.old.path)
2165 except UnicodeDecodeError:
2166 import logging
2168 logging.warning(
2169 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2170 change.old.path,
2171 )
2172 else:
2173 old_paths_normalized[normalized] = change.old.path
2174 old_changes[change.old.path] = change
2176 if (
2177 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
2178 and change.new
2179 ):
2180 assert change.new.path is not None
2181 try:
2182 normalized = normalize_path(change.new.path)
2183 except UnicodeDecodeError:
2184 import logging
2186 logging.warning(
2187 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2188 change.new.path,
2189 )
2190 else:
2191 new_paths_normalized[normalized] = change.new.path
2192 new_changes[change.new.path] = change
2194 # Find case-only renames and transform changes
2195 case_only_renames = set()
2196 new_rename_changes = []
2198 for norm_path, old_path in old_paths_normalized.items():
2199 if norm_path in new_paths_normalized:
2200 new_path = new_paths_normalized[norm_path]
2201 if old_path != new_path:
2202 # Found a case-only rename
2203 old_change = old_changes[old_path]
2204 new_change = new_changes[new_path]
2206 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
2207 if new_change.type == CHANGE_ADD:
2208 # Simple case: DELETE + ADD becomes RENAME
2209 rename_change = TreeChange(
2210 CHANGE_RENAME, old_change.old, new_change.new
2211 )
2212 else:
2213 # Complex case: DELETE + MODIFY becomes RENAME
2214 # Use the old file from DELETE and new file from MODIFY
2215 rename_change = TreeChange(
2216 CHANGE_RENAME, old_change.old, new_change.new
2217 )
2219 new_rename_changes.append(rename_change)
2221 # Mark the old changes for removal
2222 case_only_renames.add(old_change)
2223 case_only_renames.add(new_change)
2225 # Return new list with original ADD/DELETE changes replaced by renames
2226 result = [change for change in changes if change not in case_only_renames]
2227 result.extend(new_rename_changes)
2228 return result
2231def update_working_tree(
2232 repo: "Repo",
2233 old_tree_id: Optional[bytes],
2234 new_tree_id: bytes,
2235 change_iterator: Iterator["TreeChange"],
2236 honor_filemode: bool = True,
2237 validate_path_element: Optional[Callable[[bytes], bool]] = None,
2238 symlink_fn: Optional[
2239 Callable[
2240 [Union[str, bytes, os.PathLike[str]], Union[str, bytes, os.PathLike[str]]],
2241 None,
2242 ]
2243 ] = None,
2244 force_remove_untracked: bool = False,
2245 blob_normalizer: Optional["FilterBlobNormalizer"] = None,
2246 tree_encoding: str = "utf-8",
2247 allow_overwrite_modified: bool = False,
2248) -> None:
2249 """Update the working tree and index to match a new tree.
2251 This function handles:
2252 - Adding new files
2253 - Updating modified files
2254 - Removing deleted files
2255 - Cleaning up empty directories
2257 Args:
2258 repo: Repository object
2259 old_tree_id: SHA of the tree before the update
2260 new_tree_id: SHA of the tree to update to
2261 change_iterator: Iterator of TreeChange objects to apply
2262 honor_filemode: An optional flag to honor core.filemode setting
2263 validate_path_element: Function to validate path elements to check out
2264 symlink_fn: Function to use for creating symlinks
2265 force_remove_untracked: If True, remove files that exist in working
2266 directory but not in target tree, even if old_tree_id is None
2267 blob_normalizer: An optional BlobNormalizer to use for converting line
2268 endings when writing blobs to the working directory.
2269 tree_encoding: Encoding used for tree paths (default: utf-8)
2270 allow_overwrite_modified: If False, raise an error when attempting to
2271 overwrite files that have been modified compared to old_tree_id
2272 """
2273 if validate_path_element is None:
2274 validate_path_element = validate_path_element_default
2276 from .diff_tree import (
2277 CHANGE_ADD,
2278 CHANGE_COPY,
2279 CHANGE_DELETE,
2280 CHANGE_MODIFY,
2281 CHANGE_RENAME,
2282 CHANGE_UNCHANGED,
2283 )
2285 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2286 index = repo.open_index()
2288 # Convert iterator to list since we need multiple passes
2289 changes = list(change_iterator)
2291 # Transform case-only renames on case-insensitive filesystems
2292 import platform
2294 default_ignore_case = platform.system() in ("Windows", "Darwin")
2295 config = repo.get_config()
2296 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2298 if ignore_case:
2299 config = repo.get_config()
2300 changes = detect_case_only_renames(changes, config)
2302 # Check for path conflicts where files need to become directories
2303 paths_becoming_dirs = set()
2304 for change in changes:
2305 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2306 assert change.new is not None
2307 path = change.new.path
2308 assert path is not None
2309 if b"/" in path: # This is a file inside a directory
2310 # Check if any parent path exists as a file in the old tree or changes
2311 parts = path.split(b"/")
2312 for i in range(1, len(parts)):
2313 parent = b"/".join(parts[:i])
2314 # See if this parent path is being deleted (was a file, becoming a dir)
2315 for other_change in changes:
2316 if (
2317 other_change.type == CHANGE_DELETE
2318 and other_change.old
2319 and other_change.old.path == parent
2320 ):
2321 paths_becoming_dirs.add(parent)
2323 # Check if any path that needs to become a directory has been modified
2324 for path in paths_becoming_dirs:
2325 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2326 try:
2327 current_stat = os.lstat(full_path)
2328 except FileNotFoundError:
2329 continue # File doesn't exist, nothing to check
2330 except OSError as e:
2331 raise OSError(
2332 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2333 ) from e
2335 if stat.S_ISREG(current_stat.st_mode):
2336 # Find the old entry for this path
2337 old_change = None
2338 for change in changes:
2339 if (
2340 change.type == CHANGE_DELETE
2341 and change.old
2342 and change.old.path == path
2343 ):
2344 old_change = change
2345 break
2347 if old_change:
2348 # Check if file has been modified
2349 assert old_change.old is not None
2350 assert (
2351 old_change.old.sha is not None and old_change.old.mode is not None
2352 )
2353 file_matches = _check_file_matches(
2354 repo.object_store,
2355 full_path,
2356 old_change.old.sha,
2357 old_change.old.mode,
2358 current_stat,
2359 honor_filemode,
2360 blob_normalizer,
2361 path,
2362 )
2363 if not file_matches:
2364 raise OSError(
2365 f"Cannot replace modified file with directory: {path!r}"
2366 )
2368 # Check for uncommitted modifications before making any changes
2369 if not allow_overwrite_modified and old_tree_id:
2370 for change in changes:
2371 # Only check files that are being modified or deleted
2372 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2373 path = change.old.path
2374 assert path is not None
2375 if path.startswith(b".git") or not validate_path(
2376 path, validate_path_element
2377 ):
2378 continue
2380 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2381 try:
2382 current_stat = os.lstat(full_path)
2383 except FileNotFoundError:
2384 continue # File doesn't exist, nothing to check
2385 except OSError as e:
2386 raise OSError(
2387 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2388 ) from e
2390 if stat.S_ISREG(current_stat.st_mode):
2391 # Check if working tree file differs from old tree
2392 assert change.old.sha is not None and change.old.mode is not None
2393 file_matches = _check_file_matches(
2394 repo.object_store,
2395 full_path,
2396 change.old.sha,
2397 change.old.mode,
2398 current_stat,
2399 honor_filemode,
2400 blob_normalizer,
2401 path,
2402 )
2403 if not file_matches:
2404 from .errors import WorkingTreeModifiedError
2406 raise WorkingTreeModifiedError(
2407 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2408 f"would be overwritten by checkout. "
2409 f"Please commit your changes or stash them before you switch branches."
2410 )
2412 # Apply the changes
2413 for change in changes:
2414 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2415 # Remove file/directory
2416 assert change.old is not None and change.old.path is not None
2417 path = change.old.path
2418 if path.startswith(b".git") or not validate_path(
2419 path, validate_path_element
2420 ):
2421 continue
2423 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2424 try:
2425 delete_stat: Optional[os.stat_result] = os.lstat(full_path)
2426 except FileNotFoundError:
2427 delete_stat = None
2428 except OSError as e:
2429 raise OSError(
2430 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2431 ) from e
2433 _transition_to_absent(repo, path, full_path, delete_stat, index)
2435 if change.type in (
2436 CHANGE_ADD,
2437 CHANGE_MODIFY,
2438 CHANGE_UNCHANGED,
2439 CHANGE_COPY,
2440 CHANGE_RENAME,
2441 ):
2442 # Add or modify file
2443 assert (
2444 change.new is not None
2445 and change.new.path is not None
2446 and change.new.mode is not None
2447 )
2448 path = change.new.path
2449 if path.startswith(b".git") or not validate_path(
2450 path, validate_path_element
2451 ):
2452 continue
2454 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2455 try:
2456 modify_stat: Optional[os.stat_result] = os.lstat(full_path)
2457 except FileNotFoundError:
2458 modify_stat = None
2459 except OSError as e:
2460 raise OSError(
2461 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2462 ) from e
2464 if S_ISGITLINK(change.new.mode):
2465 _transition_to_submodule(
2466 repo, path, full_path, modify_stat, change.new, index
2467 )
2468 else:
2469 _transition_to_file(
2470 repo.object_store,
2471 path,
2472 full_path,
2473 modify_stat,
2474 change.new,
2475 index,
2476 honor_filemode,
2477 symlink_fn,
2478 blob_normalizer,
2479 tree_encoding,
2480 )
2482 index.write()
2485def _check_entry_for_changes(
2486 tree_path: bytes,
2487 entry: Union[IndexEntry, ConflictedIndexEntry],
2488 root_path: bytes,
2489 filter_blob_callback: Optional[Callable[[bytes, bytes], bytes]] = None,
2490) -> Optional[bytes]:
2491 """Check a single index entry for changes.
2493 Args:
2494 tree_path: Path in the tree
2495 entry: Index entry to check
2496 root_path: Root filesystem path
2497 filter_blob_callback: Optional callback to filter blobs
2498 Returns: tree_path if changed, None otherwise
2499 """
2500 if isinstance(entry, ConflictedIndexEntry):
2501 # Conflicted files are always unstaged
2502 return tree_path
2504 full_path = _tree_to_fs_path(root_path, tree_path)
2505 try:
2506 st = os.lstat(full_path)
2507 if stat.S_ISDIR(st.st_mode):
2508 if _has_directory_changed(tree_path, entry):
2509 return tree_path
2510 return None
2512 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2513 return None
2515 blob = blob_from_path_and_stat(full_path, st)
2517 if filter_blob_callback is not None:
2518 blob.data = filter_blob_callback(blob.data, tree_path)
2519 except FileNotFoundError:
2520 # The file was removed, so we assume that counts as
2521 # different from whatever file used to exist.
2522 return tree_path
2523 else:
2524 if blob.id != entry.sha:
2525 return tree_path
2526 return None
2529def get_unstaged_changes(
2530 index: Index,
2531 root_path: Union[str, bytes],
2532 filter_blob_callback: Optional[Callable[..., Any]] = None,
2533 preload_index: bool = False,
2534) -> Generator[bytes, None, None]:
2535 """Walk through an index and check for differences against working tree.
2537 Args:
2538 index: index to check
2539 root_path: path in which to find files
2540 filter_blob_callback: Optional callback to filter blobs
2541 preload_index: If True, use parallel threads to check files (requires threading support)
2542 Returns: iterator over paths with unstaged changes
2543 """
2544 # For each entry in the index check the sha1 & ensure not staged
2545 if not isinstance(root_path, bytes):
2546 root_path = os.fsencode(root_path)
2548 if preload_index:
2549 # Use parallel processing for better performance on slow filesystems
2550 try:
2551 import multiprocessing
2552 from concurrent.futures import ThreadPoolExecutor
2553 except ImportError:
2554 # If threading is not available, fall back to serial processing
2555 preload_index = False
2556 else:
2557 # Collect all entries first
2558 entries = list(index.iteritems())
2560 # Use number of CPUs but cap at 8 threads to avoid overhead
2561 num_workers = min(multiprocessing.cpu_count(), 8)
2563 # Process entries in parallel
2564 with ThreadPoolExecutor(max_workers=num_workers) as executor:
2565 # Submit all tasks
2566 futures = [
2567 executor.submit(
2568 _check_entry_for_changes,
2569 tree_path,
2570 entry,
2571 root_path,
2572 filter_blob_callback,
2573 )
2574 for tree_path, entry in entries
2575 ]
2577 # Yield results as they complete
2578 for future in futures:
2579 result = future.result()
2580 if result is not None:
2581 yield result
2583 if not preload_index:
2584 # Serial processing
2585 for tree_path, entry in index.iteritems():
2586 result = _check_entry_for_changes(
2587 tree_path, entry, root_path, filter_blob_callback
2588 )
2589 if result is not None:
2590 yield result
2593def _tree_to_fs_path(
2594 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
2595) -> bytes:
2596 """Convert a git tree path to a file system path.
2598 Args:
2599 root_path: Root filesystem path
2600 tree_path: Git tree path as bytes (encoded with tree_encoding)
2601 tree_encoding: Encoding used for tree paths (default: utf-8)
2603 Returns: File system path.
2604 """
2605 assert isinstance(tree_path, bytes)
2606 if os_sep_bytes != b"/":
2607 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
2608 else:
2609 sep_corrected_path = tree_path
2611 # On Windows, we need to handle tree path encoding properly
2612 if sys.platform == "win32":
2613 # Decode from tree encoding, then re-encode for filesystem
2614 try:
2615 tree_path_str = sep_corrected_path.decode(tree_encoding)
2616 sep_corrected_path = os.fsencode(tree_path_str)
2617 except UnicodeDecodeError:
2618 # If decoding fails, use the original bytes
2619 pass
2621 return os.path.join(root_path, sep_corrected_path)
2624def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes:
2625 """Convert a file system path to a git tree path.
2627 Args:
2628 fs_path: File system path.
2629 tree_encoding: Encoding to use for tree paths (default: utf-8)
2631 Returns: Git tree path as bytes (encoded with tree_encoding)
2632 """
2633 if not isinstance(fs_path, bytes):
2634 fs_path_bytes = os.fsencode(fs_path)
2635 else:
2636 fs_path_bytes = fs_path
2638 # On Windows, we need to ensure tree paths are properly encoded
2639 if sys.platform == "win32":
2640 try:
2641 # Decode from filesystem encoding, then re-encode with tree encoding
2642 fs_path_str = os.fsdecode(fs_path_bytes)
2643 fs_path_bytes = fs_path_str.encode(tree_encoding)
2644 except UnicodeDecodeError:
2645 # If filesystem decoding fails, use the original bytes
2646 pass
2648 if os_sep_bytes != b"/":
2649 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2650 else:
2651 tree_path = fs_path_bytes
2652 return tree_path
2655def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
2656 """Create an index entry for a directory.
2658 This is only used for submodules (directories containing .git).
2660 Args:
2661 st: Stat result for the directory
2662 path: Path to the directory
2664 Returns:
2665 IndexEntry for a submodule, or None if not a submodule
2666 """
2667 if os.path.exists(os.path.join(path, b".git")):
2668 head = read_submodule_head(path)
2669 if head is None:
2670 return None
2671 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2672 return None
2675def index_entry_from_path(
2676 path: bytes, object_store: Optional[ObjectContainer] = None
2677) -> Optional[IndexEntry]:
2678 """Create an index from a filesystem path.
2680 This returns an index value for files, symlinks
2681 and tree references. for directories and
2682 non-existent files it returns None
2684 Args:
2685 path: Path to create an index entry for
2686 object_store: Optional object store to
2687 save new blobs in
2688 Returns: An index entry; None for directories
2689 """
2690 assert isinstance(path, bytes)
2691 st = os.lstat(path)
2692 if stat.S_ISDIR(st.st_mode):
2693 return index_entry_from_directory(st, path)
2695 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
2696 blob = blob_from_path_and_stat(path, st)
2697 if object_store is not None:
2698 object_store.add_object(blob)
2699 return index_entry_from_stat(st, blob.id)
2701 return None
2704def iter_fresh_entries(
2705 paths: Iterable[bytes],
2706 root_path: bytes,
2707 object_store: Optional[ObjectContainer] = None,
2708) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
2709 """Iterate over current versions of index entries on disk.
2711 Args:
2712 paths: Paths to iterate over
2713 root_path: Root path to access from
2714 object_store: Optional store to save new blobs in
2715 Returns: Iterator over path, index_entry
2716 """
2717 for path in paths:
2718 p = _tree_to_fs_path(root_path, path)
2719 try:
2720 entry = index_entry_from_path(p, object_store=object_store)
2721 except (FileNotFoundError, IsADirectoryError):
2722 entry = None
2723 yield path, entry
2726def iter_fresh_objects(
2727 paths: Iterable[bytes],
2728 root_path: bytes,
2729 include_deleted: bool = False,
2730 object_store: Optional[ObjectContainer] = None,
2731) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
2732 """Iterate over versions of objects on disk referenced by index.
2734 Args:
2735 paths: Paths to check
2736 root_path: Root path to access from
2737 include_deleted: Include deleted entries with sha and
2738 mode set to None
2739 object_store: Optional object store to report new items to
2740 Returns: Iterator over path, sha, mode
2741 """
2742 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
2743 if entry is None:
2744 if include_deleted:
2745 yield path, None, None
2746 else:
2747 yield path, entry.sha, cleanup_mode(entry.mode)
2750def refresh_index(index: Index, root_path: bytes) -> None:
2751 """Refresh the contents of an index.
2753 This is the equivalent to running 'git commit -a'.
2755 Args:
2756 index: Index to update
2757 root_path: Root filesystem path
2758 """
2759 for path, entry in iter_fresh_entries(index, root_path):
2760 if entry:
2761 index[path] = entry
2764class locked_index:
2765 """Lock the index while making modifications.
2767 Works as a context manager.
2768 """
2770 _file: "_GitFile"
2772 def __init__(self, path: Union[bytes, str]) -> None:
2773 """Initialize locked_index."""
2774 self._path = path
2776 def __enter__(self) -> Index:
2777 """Enter context manager and lock index."""
2778 f = GitFile(self._path, "wb")
2779 self._file = f
2780 self._index = Index(self._path)
2781 return self._index
2783 def __exit__(
2784 self,
2785 exc_type: Optional[type],
2786 exc_value: Optional[BaseException],
2787 traceback: Optional[types.TracebackType],
2788 ) -> None:
2789 """Exit context manager and unlock index."""
2790 if exc_type is not None:
2791 self._file.abort()
2792 return
2793 try:
2794 f = SHA1Writer(self._file)
2795 write_index_dict(f, self._index._byname)
2796 except BaseException:
2797 self._file.abort()
2798 else:
2799 f.close()