Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import Generator, Iterable, Iterator
32from dataclasses import dataclass
33from enum import Enum
34from typing import (
35 IO,
36 TYPE_CHECKING,
37 Any,
38 BinaryIO,
39 Callable,
40 Optional,
41 Union,
42)
44if TYPE_CHECKING:
45 from .config import Config
46 from .diff_tree import TreeChange
47 from .file import _GitFile
48 from .line_ending import BlobNormalizer
49 from .object_store import BaseObjectStore
50 from .repo import Repo
52from .file import GitFile
53from .object_store import iter_tree_contents
54from .objects import (
55 S_IFGITLINK,
56 S_ISGITLINK,
57 Blob,
58 ObjectID,
59 Tree,
60 hex_to_sha,
61 sha_to_hex,
62)
63from .pack import ObjectContainer, SHA1Reader, SHA1Writer
65# 2-bit stage (during merge)
66FLAG_STAGEMASK = 0x3000
67FLAG_STAGESHIFT = 12
68FLAG_NAMEMASK = 0x0FFF
70# assume-valid
71FLAG_VALID = 0x8000
73# extended flag (must be zero in version 2)
74FLAG_EXTENDED = 0x4000
76# used by sparse checkout
77EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
79# used by "git add -N"
80EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
82DEFAULT_VERSION = 2
84# Index extension signatures
85TREE_EXTENSION = b"TREE"
86REUC_EXTENSION = b"REUC"
87UNTR_EXTENSION = b"UNTR"
88EOIE_EXTENSION = b"EOIE"
89IEOT_EXTENSION = b"IEOT"
92def _encode_varint(value: int) -> bytes:
93 """Encode an integer using variable-width encoding.
95 Same format as used for OFS_DELTA pack entries and index v4 path compression.
96 Uses 7 bits per byte, with the high bit indicating continuation.
98 Args:
99 value: Integer to encode
100 Returns:
101 Encoded bytes
102 """
103 if value == 0:
104 return b"\x00"
106 result = []
107 while value > 0:
108 byte = value & 0x7F # Take lower 7 bits
109 value >>= 7
110 if value > 0:
111 byte |= 0x80 # Set continuation bit
112 result.append(byte)
114 return bytes(result)
117def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
118 """Decode a variable-width encoded integer.
120 Args:
121 data: Bytes to decode from
122 offset: Starting offset in data
123 Returns:
124 tuple of (decoded_value, new_offset)
125 """
126 value = 0
127 shift = 0
128 pos = offset
130 while pos < len(data):
131 byte = data[pos]
132 pos += 1
133 value |= (byte & 0x7F) << shift
134 shift += 7
135 if not (byte & 0x80): # No continuation bit
136 break
138 return value, pos
141def _compress_path(path: bytes, previous_path: bytes) -> bytes:
142 """Compress a path relative to the previous path for index version 4.
144 Args:
145 path: Path to compress
146 previous_path: Previous path for comparison
147 Returns:
148 Compressed path data (varint prefix_len + suffix)
149 """
150 # Find the common prefix length
151 common_len = 0
152 min_len = min(len(path), len(previous_path))
154 for i in range(min_len):
155 if path[i] == previous_path[i]:
156 common_len += 1
157 else:
158 break
160 # The number of bytes to remove from the end of previous_path
161 # to get the common prefix
162 remove_len = len(previous_path) - common_len
164 # The suffix to append
165 suffix = path[common_len:]
167 # Encode: varint(remove_len) + suffix + NUL
168 return _encode_varint(remove_len) + suffix + b"\x00"
171def _decompress_path(
172 data: bytes, offset: int, previous_path: bytes
173) -> tuple[bytes, int]:
174 """Decompress a path from index version 4 compressed format.
176 Args:
177 data: Raw data containing compressed path
178 offset: Starting offset in data
179 previous_path: Previous path for decompression
180 Returns:
181 tuple of (decompressed_path, new_offset)
182 """
183 # Decode the number of bytes to remove from previous path
184 remove_len, new_offset = _decode_varint(data, offset)
186 # Find the NUL terminator for the suffix
187 suffix_start = new_offset
188 suffix_end = suffix_start
189 while suffix_end < len(data) and data[suffix_end] != 0:
190 suffix_end += 1
192 if suffix_end >= len(data):
193 raise ValueError("Unterminated path suffix in compressed entry")
195 suffix = data[suffix_start:suffix_end]
196 new_offset = suffix_end + 1 # Skip the NUL terminator
198 # Reconstruct the path
199 if remove_len > len(previous_path):
200 raise ValueError(
201 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
202 )
204 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
205 path = prefix + suffix
207 return path, new_offset
210def _decompress_path_from_stream(
211 f: BinaryIO, previous_path: bytes
212) -> tuple[bytes, int]:
213 """Decompress a path from index version 4 compressed format, reading from stream.
215 Args:
216 f: File-like object to read from
217 previous_path: Previous path for decompression
218 Returns:
219 tuple of (decompressed_path, bytes_consumed)
220 """
221 # Decode the varint for remove_len by reading byte by byte
222 remove_len = 0
223 shift = 0
224 bytes_consumed = 0
226 while True:
227 byte_data = f.read(1)
228 if not byte_data:
229 raise ValueError("Unexpected end of file while reading varint")
230 byte = byte_data[0]
231 bytes_consumed += 1
232 remove_len |= (byte & 0x7F) << shift
233 shift += 7
234 if not (byte & 0x80): # No continuation bit
235 break
237 # Read the suffix until NUL terminator
238 suffix = b""
239 while True:
240 byte_data = f.read(1)
241 if not byte_data:
242 raise ValueError("Unexpected end of file while reading path suffix")
243 byte = byte_data[0]
244 bytes_consumed += 1
245 if byte == 0: # NUL terminator
246 break
247 suffix += bytes([byte])
249 # Reconstruct the path
250 if remove_len > len(previous_path):
251 raise ValueError(
252 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
253 )
255 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
256 path = prefix + suffix
258 return path, bytes_consumed
261class Stage(Enum):
262 """Represents the stage of an index entry during merge conflicts."""
264 NORMAL = 0
265 MERGE_CONFLICT_ANCESTOR = 1
266 MERGE_CONFLICT_THIS = 2
267 MERGE_CONFLICT_OTHER = 3
270@dataclass
271class SerializedIndexEntry:
272 """Represents a serialized index entry as stored in the index file.
274 This dataclass holds the raw data for an index entry before it's
275 parsed into the more user-friendly IndexEntry format.
276 """
278 name: bytes
279 ctime: Union[int, float, tuple[int, int]]
280 mtime: Union[int, float, tuple[int, int]]
281 dev: int
282 ino: int
283 mode: int
284 uid: int
285 gid: int
286 size: int
287 sha: bytes
288 flags: int
289 extended_flags: int
291 def stage(self) -> Stage:
292 """Extract the stage from the flags field.
294 Returns:
295 Stage enum value indicating merge conflict state
296 """
297 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
300@dataclass
301class IndexExtension:
302 """Base class for index extensions."""
304 signature: bytes
305 data: bytes
307 @classmethod
308 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
309 """Create an extension from raw data.
311 Args:
312 signature: 4-byte extension signature
313 data: Extension data
314 Returns:
315 Parsed extension object
316 """
317 if signature == TREE_EXTENSION:
318 return TreeExtension.from_bytes(data)
319 elif signature == REUC_EXTENSION:
320 return ResolveUndoExtension.from_bytes(data)
321 elif signature == UNTR_EXTENSION:
322 return UntrackedExtension.from_bytes(data)
323 else:
324 # Unknown extension - just store raw data
325 return cls(signature, data)
327 def to_bytes(self) -> bytes:
328 """Serialize extension to bytes."""
329 return self.data
332class TreeExtension(IndexExtension):
333 """Tree cache extension."""
335 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
336 """Initialize TreeExtension.
338 Args:
339 entries: List of tree cache entries (path, sha, flags)
340 """
341 self.entries = entries
342 super().__init__(TREE_EXTENSION, b"")
344 @classmethod
345 def from_bytes(cls, data: bytes) -> "TreeExtension":
346 """Parse TreeExtension from bytes.
348 Args:
349 data: Raw bytes to parse
351 Returns:
352 TreeExtension instance
353 """
354 # TODO: Implement tree cache parsing
355 return cls([])
357 def to_bytes(self) -> bytes:
358 """Serialize TreeExtension to bytes.
360 Returns:
361 Serialized extension data
362 """
363 # TODO: Implement tree cache serialization
364 return b""
367class ResolveUndoExtension(IndexExtension):
368 """Resolve undo extension for recording merge conflicts."""
370 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
371 """Initialize ResolveUndoExtension.
373 Args:
374 entries: List of (path, stages) where stages is a list of (stage, sha) tuples
375 """
376 self.entries = entries
377 super().__init__(REUC_EXTENSION, b"")
379 @classmethod
380 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
381 """Parse ResolveUndoExtension from bytes.
383 Args:
384 data: Raw bytes to parse
386 Returns:
387 ResolveUndoExtension instance
388 """
389 # TODO: Implement resolve undo parsing
390 return cls([])
392 def to_bytes(self) -> bytes:
393 """Serialize ResolveUndoExtension to bytes.
395 Returns:
396 Serialized extension data
397 """
398 # TODO: Implement resolve undo serialization
399 return b""
402class UntrackedExtension(IndexExtension):
403 """Untracked cache extension."""
405 def __init__(self, data: bytes) -> None:
406 """Initialize UntrackedExtension.
408 Args:
409 data: Raw untracked cache data
410 """
411 super().__init__(UNTR_EXTENSION, data)
413 @classmethod
414 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
415 """Parse UntrackedExtension from bytes.
417 Args:
418 data: Raw bytes to parse
420 Returns:
421 UntrackedExtension instance
422 """
423 return cls(data)
426@dataclass
427class IndexEntry:
428 """Represents an entry in the Git index.
430 This is a higher-level representation of an index entry that includes
431 parsed data and convenience methods.
432 """
434 ctime: Union[int, float, tuple[int, int]]
435 mtime: Union[int, float, tuple[int, int]]
436 dev: int
437 ino: int
438 mode: int
439 uid: int
440 gid: int
441 size: int
442 sha: bytes
443 flags: int = 0
444 extended_flags: int = 0
446 @classmethod
447 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
448 """Create an IndexEntry from a SerializedIndexEntry.
450 Args:
451 serialized: SerializedIndexEntry to convert
453 Returns:
454 New IndexEntry instance
455 """
456 return cls(
457 ctime=serialized.ctime,
458 mtime=serialized.mtime,
459 dev=serialized.dev,
460 ino=serialized.ino,
461 mode=serialized.mode,
462 uid=serialized.uid,
463 gid=serialized.gid,
464 size=serialized.size,
465 sha=serialized.sha,
466 flags=serialized.flags,
467 extended_flags=serialized.extended_flags,
468 )
470 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
471 """Serialize this entry with a given name and stage.
473 Args:
474 name: Path name for the entry
475 stage: Merge conflict stage
477 Returns:
478 SerializedIndexEntry ready for writing to disk
479 """
480 # Clear out any existing stage bits, then set them from the Stage.
481 new_flags = self.flags & ~FLAG_STAGEMASK
482 new_flags |= stage.value << FLAG_STAGESHIFT
483 return SerializedIndexEntry(
484 name=name,
485 ctime=self.ctime,
486 mtime=self.mtime,
487 dev=self.dev,
488 ino=self.ino,
489 mode=self.mode,
490 uid=self.uid,
491 gid=self.gid,
492 size=self.size,
493 sha=self.sha,
494 flags=new_flags,
495 extended_flags=self.extended_flags,
496 )
498 def stage(self) -> Stage:
499 """Get the merge conflict stage of this entry.
501 Returns:
502 Stage enum value
503 """
504 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
506 @property
507 def skip_worktree(self) -> bool:
508 """Return True if the skip-worktree bit is set in extended_flags."""
509 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
511 def set_skip_worktree(self, skip: bool = True) -> None:
512 """Helper method to set or clear the skip-worktree bit in extended_flags.
514 Also sets FLAG_EXTENDED in self.flags if needed.
515 """
516 if skip:
517 # Turn on the skip-worktree bit
518 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
519 # Also ensure the main 'extended' bit is set in flags
520 self.flags |= FLAG_EXTENDED
521 else:
522 # Turn off the skip-worktree bit
523 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
524 # Optionally unset the main extended bit if no extended flags remain
525 if self.extended_flags == 0:
526 self.flags &= ~FLAG_EXTENDED
529class ConflictedIndexEntry:
530 """Index entry that represents a conflict."""
532 ancestor: Optional[IndexEntry]
533 this: Optional[IndexEntry]
534 other: Optional[IndexEntry]
536 def __init__(
537 self,
538 ancestor: Optional[IndexEntry] = None,
539 this: Optional[IndexEntry] = None,
540 other: Optional[IndexEntry] = None,
541 ) -> None:
542 """Initialize ConflictedIndexEntry.
544 Args:
545 ancestor: The common ancestor entry
546 this: The current branch entry
547 other: The other branch entry
548 """
549 self.ancestor = ancestor
550 self.this = this
551 self.other = other
554class UnmergedEntries(Exception):
555 """Unmerged entries exist in the index."""
558def pathsplit(path: bytes) -> tuple[bytes, bytes]:
559 """Split a /-delimited path into a directory part and a basename.
561 Args:
562 path: The path to split.
564 Returns:
565 Tuple with directory name and basename
566 """
567 try:
568 (dirname, basename) = path.rsplit(b"/", 1)
569 except ValueError:
570 return (b"", path)
571 else:
572 return (dirname, basename)
575def pathjoin(*args: bytes) -> bytes:
576 """Join a /-delimited path."""
577 return b"/".join([p for p in args if p])
580def read_cache_time(f: BinaryIO) -> tuple[int, int]:
581 """Read a cache time.
583 Args:
584 f: File-like object to read from
585 Returns:
586 Tuple with seconds and nanoseconds
587 """
588 return struct.unpack(">LL", f.read(8))
591def write_cache_time(f: IO[bytes], t: Union[int, float, tuple[int, int]]) -> None:
592 """Write a cache time.
594 Args:
595 f: File-like object to write to
596 t: Time to write (as int, float or tuple with secs and nsecs)
597 """
598 if isinstance(t, int):
599 t = (t, 0)
600 elif isinstance(t, float):
601 (secs, nsecs) = divmod(t, 1.0)
602 t = (int(secs), int(nsecs * 1000000000))
603 elif not isinstance(t, tuple):
604 raise TypeError(t)
605 f.write(struct.pack(">LL", *t))
608def read_cache_entry(
609 f: BinaryIO, version: int, previous_path: bytes = b""
610) -> SerializedIndexEntry:
611 """Read an entry from a cache file.
613 Args:
614 f: File-like object to read from
615 version: Index version
616 previous_path: Previous entry's path (for version 4 compression)
617 """
618 beginoffset = f.tell()
619 ctime = read_cache_time(f)
620 mtime = read_cache_time(f)
621 (
622 dev,
623 ino,
624 mode,
625 uid,
626 gid,
627 size,
628 sha,
629 flags,
630 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
631 if flags & FLAG_EXTENDED:
632 if version < 3:
633 raise AssertionError("extended flag set in index with version < 3")
634 (extended_flags,) = struct.unpack(">H", f.read(2))
635 else:
636 extended_flags = 0
638 if version >= 4:
639 # Version 4: paths are always compressed (name_len should be 0)
640 name, consumed = _decompress_path_from_stream(f, previous_path)
641 else:
642 # Versions < 4: regular name reading
643 name = f.read(flags & FLAG_NAMEMASK)
645 # Padding:
646 if version < 4:
647 real_size = (f.tell() - beginoffset + 8) & ~7
648 f.read((beginoffset + real_size) - f.tell())
650 return SerializedIndexEntry(
651 name,
652 ctime,
653 mtime,
654 dev,
655 ino,
656 mode,
657 uid,
658 gid,
659 size,
660 sha_to_hex(sha),
661 flags & ~FLAG_NAMEMASK,
662 extended_flags,
663 )
666def write_cache_entry(
667 f: IO[bytes], entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
668) -> None:
669 """Write an index entry to a file.
671 Args:
672 f: File object
673 entry: IndexEntry to write
674 version: Index format version
675 previous_path: Previous entry's path (for version 4 compression)
676 """
677 beginoffset = f.tell()
678 write_cache_time(f, entry.ctime)
679 write_cache_time(f, entry.mtime)
681 if version >= 4:
682 # Version 4: use compression but set name_len to actual filename length
683 # This matches how C Git implements index v4 flags
684 compressed_path = _compress_path(entry.name, previous_path)
685 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
686 else:
687 # Versions < 4: include actual name length
688 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
690 if entry.extended_flags:
691 flags |= FLAG_EXTENDED
692 if flags & FLAG_EXTENDED and version is not None and version < 3:
693 raise AssertionError("unable to use extended flags in version < 3")
695 f.write(
696 struct.pack(
697 b">LLLLLL20sH",
698 entry.dev & 0xFFFFFFFF,
699 entry.ino & 0xFFFFFFFF,
700 entry.mode,
701 entry.uid,
702 entry.gid,
703 entry.size,
704 hex_to_sha(entry.sha),
705 flags,
706 )
707 )
708 if flags & FLAG_EXTENDED:
709 f.write(struct.pack(b">H", entry.extended_flags))
711 if version >= 4:
712 # Version 4: always write compressed path
713 f.write(compressed_path)
714 else:
715 # Versions < 4: write regular path and padding
716 f.write(entry.name)
717 real_size = (f.tell() - beginoffset + 8) & ~7
718 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
721class UnsupportedIndexFormat(Exception):
722 """An unsupported index format was encountered."""
724 def __init__(self, version: int) -> None:
725 """Initialize UnsupportedIndexFormat exception.
727 Args:
728 version: The unsupported index format version
729 """
730 self.index_format_version = version
733def read_index_header(f: BinaryIO) -> tuple[int, int]:
734 """Read an index header from a file.
736 Returns:
737 tuple of (version, num_entries)
738 """
739 header = f.read(4)
740 if header != b"DIRC":
741 raise AssertionError(f"Invalid index file header: {header!r}")
742 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
743 if version not in (1, 2, 3, 4):
744 raise UnsupportedIndexFormat(version)
745 return version, num_entries
748def write_index_extension(f: IO[bytes], extension: IndexExtension) -> None:
749 """Write an index extension.
751 Args:
752 f: File-like object to write to
753 extension: Extension to write
754 """
755 data = extension.to_bytes()
756 f.write(extension.signature)
757 f.write(struct.pack(">I", len(data)))
758 f.write(data)
761def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
762 """Read an index file, yielding the individual entries."""
763 version, num_entries = read_index_header(f)
764 previous_path = b""
765 for i in range(num_entries):
766 entry = read_cache_entry(f, version, previous_path)
767 previous_path = entry.name
768 yield entry
771def read_index_dict_with_version(
772 f: BinaryIO,
773) -> tuple[
774 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
775]:
776 """Read an index file and return it as a dictionary along with the version.
778 Returns:
779 tuple of (entries_dict, version, extensions)
780 """
781 version, num_entries = read_index_header(f)
783 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
784 previous_path = b""
785 for i in range(num_entries):
786 entry = read_cache_entry(f, version, previous_path)
787 previous_path = entry.name
788 stage = entry.stage()
789 if stage == Stage.NORMAL:
790 ret[entry.name] = IndexEntry.from_serialized(entry)
791 else:
792 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
793 if isinstance(existing, IndexEntry):
794 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
795 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
796 existing.ancestor = IndexEntry.from_serialized(entry)
797 elif stage == Stage.MERGE_CONFLICT_THIS:
798 existing.this = IndexEntry.from_serialized(entry)
799 elif stage == Stage.MERGE_CONFLICT_OTHER:
800 existing.other = IndexEntry.from_serialized(entry)
802 # Read extensions
803 extensions = []
804 while True:
805 # Check if we're at the end (20 bytes before EOF for SHA checksum)
806 current_pos = f.tell()
807 f.seek(0, 2) # EOF
808 eof_pos = f.tell()
809 f.seek(current_pos)
811 if current_pos >= eof_pos - 20:
812 break
814 # Try to read extension signature
815 signature = f.read(4)
816 if len(signature) < 4:
817 break
819 # Check if it's a valid extension signature (4 uppercase letters)
820 if not all(65 <= b <= 90 for b in signature):
821 # Not an extension, seek back
822 f.seek(-4, 1)
823 break
825 # Read extension size
826 size_data = f.read(4)
827 if len(size_data) < 4:
828 break
829 size = struct.unpack(">I", size_data)[0]
831 # Read extension data
832 data = f.read(size)
833 if len(data) < size:
834 break
836 extension = IndexExtension.from_raw(signature, data)
837 extensions.append(extension)
839 return ret, version, extensions
842def read_index_dict(
843 f: BinaryIO,
844) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
845 """Read an index file and return it as a dictionary.
847 Dict Key is tuple of path and stage number, as
848 path alone is not unique
849 Args:
850 f: File object to read fromls.
851 """
852 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
853 for entry in read_index(f):
854 stage = entry.stage()
855 if stage == Stage.NORMAL:
856 ret[entry.name] = IndexEntry.from_serialized(entry)
857 else:
858 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
859 if isinstance(existing, IndexEntry):
860 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
861 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
862 existing.ancestor = IndexEntry.from_serialized(entry)
863 elif stage == Stage.MERGE_CONFLICT_THIS:
864 existing.this = IndexEntry.from_serialized(entry)
865 elif stage == Stage.MERGE_CONFLICT_OTHER:
866 existing.other = IndexEntry.from_serialized(entry)
867 return ret
870def write_index(
871 f: IO[bytes],
872 entries: list[SerializedIndexEntry],
873 version: Optional[int] = None,
874 extensions: Optional[list[IndexExtension]] = None,
875) -> None:
876 """Write an index file.
878 Args:
879 f: File-like object to write to
880 version: Version number to write
881 entries: Iterable over the entries to write
882 extensions: Optional list of extensions to write
883 """
884 if version is None:
885 version = DEFAULT_VERSION
886 # STEP 1: check if any extended_flags are set
887 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
888 if uses_extended_flags and version < 3:
889 # Force or bump the version to 3
890 version = 3
891 # The rest is unchanged, but you might insert a final check:
892 if version < 3:
893 # Double-check no extended flags appear
894 for e in entries:
895 if e.extended_flags != 0:
896 raise AssertionError("Attempt to use extended flags in index < v3")
897 # Proceed with the existing code to write the header and entries.
898 f.write(b"DIRC")
899 f.write(struct.pack(b">LL", version, len(entries)))
900 previous_path = b""
901 for entry in entries:
902 write_cache_entry(f, entry, version=version, previous_path=previous_path)
903 previous_path = entry.name
905 # Write extensions
906 if extensions:
907 for extension in extensions:
908 write_index_extension(f, extension)
911def write_index_dict(
912 f: IO[bytes],
913 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
914 version: Optional[int] = None,
915 extensions: Optional[list[IndexExtension]] = None,
916) -> None:
917 """Write an index file based on the contents of a dictionary.
919 being careful to sort by path and then by stage.
920 """
921 entries_list = []
922 for key in sorted(entries):
923 value = entries[key]
924 if isinstance(value, ConflictedIndexEntry):
925 if value.ancestor is not None:
926 entries_list.append(
927 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
928 )
929 if value.this is not None:
930 entries_list.append(
931 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
932 )
933 if value.other is not None:
934 entries_list.append(
935 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
936 )
937 else:
938 entries_list.append(value.serialize(key, Stage.NORMAL))
940 write_index(f, entries_list, version=version, extensions=extensions)
943def cleanup_mode(mode: int) -> int:
944 """Cleanup a mode value.
946 This will return a mode that can be stored in a tree object.
948 Args:
949 mode: Mode to clean up.
951 Returns:
952 mode
953 """
954 if stat.S_ISLNK(mode):
955 return stat.S_IFLNK
956 elif stat.S_ISDIR(mode):
957 return stat.S_IFDIR
958 elif S_ISGITLINK(mode):
959 return S_IFGITLINK
960 ret = stat.S_IFREG | 0o644
961 if mode & 0o100:
962 ret |= 0o111
963 return ret
966class Index:
967 """A Git Index file."""
969 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
971 def __init__(
972 self,
973 filename: Union[bytes, str, os.PathLike],
974 read: bool = True,
975 skip_hash: bool = False,
976 version: Optional[int] = None,
977 ) -> None:
978 """Create an index object associated with the given filename.
980 Args:
981 filename: Path to the index file
982 read: Whether to initialize the index from the given file, should it exist.
983 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
984 version: Index format version to use (None = auto-detect from file or use default)
985 """
986 self._filename = os.fspath(filename)
987 # TODO(jelmer): Store the version returned by read_index
988 self._version = version
989 self._skip_hash = skip_hash
990 self._extensions: list[IndexExtension] = []
991 self.clear()
992 if read:
993 self.read()
995 @property
996 def path(self) -> Union[bytes, str]:
997 """Get the path to the index file.
999 Returns:
1000 Path to the index file
1001 """
1002 return self._filename
1004 def __repr__(self) -> str:
1005 """Return string representation of Index."""
1006 return f"{self.__class__.__name__}({self._filename!r})"
1008 def write(self) -> None:
1009 """Write current contents of index to disk."""
1010 f = GitFile(self._filename, "wb")
1011 try:
1012 # Filter out extensions with no meaningful data
1013 meaningful_extensions = []
1014 for ext in self._extensions:
1015 # Skip extensions that have empty data
1016 ext_data = ext.to_bytes()
1017 if ext_data:
1018 meaningful_extensions.append(ext)
1020 if self._skip_hash:
1021 # When skipHash is enabled, write the index without computing SHA1
1022 write_index_dict(
1023 f,
1024 self._byname,
1025 version=self._version,
1026 extensions=meaningful_extensions,
1027 )
1028 # Write 20 zero bytes instead of SHA1
1029 f.write(b"\x00" * 20)
1030 f.close()
1031 else:
1032 sha1_writer = SHA1Writer(f)
1033 write_index_dict(
1034 sha1_writer,
1035 self._byname,
1036 version=self._version,
1037 extensions=meaningful_extensions,
1038 )
1039 sha1_writer.close()
1040 except:
1041 f.close()
1042 raise
1044 def read(self) -> None:
1045 """Read current contents of index from disk."""
1046 if not os.path.exists(self._filename):
1047 return
1048 f = GitFile(self._filename, "rb")
1049 try:
1050 sha1_reader = SHA1Reader(f)
1051 entries, version, extensions = read_index_dict_with_version(sha1_reader)
1052 self._version = version
1053 self._extensions = extensions
1054 self.update(entries)
1055 # Extensions have already been read by read_index_dict_with_version
1056 sha1_reader.check_sha(allow_empty=True)
1057 finally:
1058 f.close()
1060 def __len__(self) -> int:
1061 """Number of entries in this index file."""
1062 return len(self._byname)
1064 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
1065 """Retrieve entry by relative path and stage.
1067 Returns: Either a IndexEntry or a ConflictedIndexEntry
1068 Raises KeyError: if the entry does not exist
1069 """
1070 return self._byname[key]
1072 def __iter__(self) -> Iterator[bytes]:
1073 """Iterate over the paths and stages in this index."""
1074 return iter(self._byname)
1076 def __contains__(self, key: bytes) -> bool:
1077 """Check if a path exists in the index."""
1078 return key in self._byname
1080 def get_sha1(self, path: bytes) -> bytes:
1081 """Return the (git object) SHA1 for the object at a path."""
1082 value = self[path]
1083 if isinstance(value, ConflictedIndexEntry):
1084 raise UnmergedEntries
1085 return value.sha
1087 def get_mode(self, path: bytes) -> int:
1088 """Return the POSIX file mode for the object at a path."""
1089 value = self[path]
1090 if isinstance(value, ConflictedIndexEntry):
1091 raise UnmergedEntries
1092 return value.mode
1094 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
1095 """Iterate over path, sha, mode tuples for use with commit_tree."""
1096 for path in self:
1097 entry = self[path]
1098 if isinstance(entry, ConflictedIndexEntry):
1099 raise UnmergedEntries
1100 yield path, entry.sha, cleanup_mode(entry.mode)
1102 def has_conflicts(self) -> bool:
1103 """Check if the index contains any conflicted entries.
1105 Returns:
1106 True if any entries are conflicted, False otherwise
1107 """
1108 for value in self._byname.values():
1109 if isinstance(value, ConflictedIndexEntry):
1110 return True
1111 return False
1113 def clear(self) -> None:
1114 """Remove all contents from this index."""
1115 self._byname = {}
1117 def __setitem__(
1118 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
1119 ) -> None:
1120 """Set an entry in the index."""
1121 assert isinstance(name, bytes)
1122 self._byname[name] = value
1124 def __delitem__(self, name: bytes) -> None:
1125 """Delete an entry from the index."""
1126 del self._byname[name]
1128 def iteritems(
1129 self,
1130 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1131 """Iterate over (path, entry) pairs in the index.
1133 Returns:
1134 Iterator of (path, entry) tuples
1135 """
1136 return iter(self._byname.items())
1138 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1139 """Get an iterator over (path, entry) pairs.
1141 Returns:
1142 Iterator of (path, entry) tuples
1143 """
1144 return iter(self._byname.items())
1146 def update(
1147 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
1148 ) -> None:
1149 """Update the index with multiple entries.
1151 Args:
1152 entries: Dictionary mapping paths to index entries
1153 """
1154 for key, value in entries.items():
1155 self[key] = value
1157 def paths(self) -> Generator[bytes, None, None]:
1158 """Generate all paths in the index.
1160 Yields:
1161 Path names as bytes
1162 """
1163 yield from self._byname.keys()
1165 def changes_from_tree(
1166 self,
1167 object_store: ObjectContainer,
1168 tree: ObjectID,
1169 want_unchanged: bool = False,
1170 ) -> Generator[
1171 tuple[
1172 tuple[Optional[bytes], Optional[bytes]],
1173 tuple[Optional[int], Optional[int]],
1174 tuple[Optional[bytes], Optional[bytes]],
1175 ],
1176 None,
1177 None,
1178 ]:
1179 """Find the differences between the contents of this index and a tree.
1181 Args:
1182 object_store: Object store to use for retrieving tree contents
1183 tree: SHA1 of the root tree
1184 want_unchanged: Whether unchanged files should be reported
1185 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1186 newmode), (oldsha, newsha)
1187 """
1189 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1190 entry = self[path]
1191 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1192 return entry.sha, cleanup_mode(entry.mode)
1193 else:
1194 # Handle ConflictedIndexEntry case
1195 return b"", 0
1197 yield from changes_from_tree(
1198 self.paths(),
1199 lookup_entry,
1200 object_store,
1201 tree,
1202 want_unchanged=want_unchanged,
1203 )
1205 def commit(self, object_store: ObjectContainer) -> bytes:
1206 """Create a new tree from an index.
1208 Args:
1209 object_store: Object store to save the tree in
1210 Returns:
1211 Root tree SHA
1212 """
1213 return commit_tree(object_store, self.iterobjects())
1216def commit_tree(
1217 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1218) -> bytes:
1219 """Commit a new tree.
1221 Args:
1222 object_store: Object store to add trees to
1223 blobs: Iterable over blob path, sha, mode entries
1224 Returns:
1225 SHA1 of the created tree.
1226 """
1227 trees: dict[bytes, Any] = {b"": {}}
1229 def add_tree(path: bytes) -> dict[bytes, Any]:
1230 if path in trees:
1231 return trees[path]
1232 dirname, basename = pathsplit(path)
1233 t = add_tree(dirname)
1234 assert isinstance(basename, bytes)
1235 newtree: dict[bytes, Any] = {}
1236 t[basename] = newtree
1237 trees[path] = newtree
1238 return newtree
1240 for path, sha, mode in blobs:
1241 tree_path, basename = pathsplit(path)
1242 tree = add_tree(tree_path)
1243 tree[basename] = (mode, sha)
1245 def build_tree(path: bytes) -> bytes:
1246 tree = Tree()
1247 for basename, entry in trees[path].items():
1248 if isinstance(entry, dict):
1249 mode = stat.S_IFDIR
1250 sha = build_tree(pathjoin(path, basename))
1251 else:
1252 (mode, sha) = entry
1253 tree.add(basename, mode, sha)
1254 object_store.add_object(tree)
1255 return tree.id
1257 return build_tree(b"")
1260def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1261 """Create a new tree from an index.
1263 Args:
1264 object_store: Object store to save the tree in
1265 index: Index file
1266 Note: This function is deprecated, use index.commit() instead.
1267 Returns: Root tree sha.
1268 """
1269 return commit_tree(object_store, index.iterobjects())
1272def changes_from_tree(
1273 names: Iterable[bytes],
1274 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1275 object_store: ObjectContainer,
1276 tree: Optional[bytes],
1277 want_unchanged: bool = False,
1278) -> Iterable[
1279 tuple[
1280 tuple[Optional[bytes], Optional[bytes]],
1281 tuple[Optional[int], Optional[int]],
1282 tuple[Optional[bytes], Optional[bytes]],
1283 ]
1284]:
1285 """Find the differences between the contents of a tree and a working copy.
1287 Args:
1288 names: Iterable of names in the working copy
1289 lookup_entry: Function to lookup an entry in the working copy
1290 object_store: Object store to use for retrieving tree contents
1291 tree: SHA1 of the root tree, or None for an empty tree
1292 want_unchanged: Whether unchanged files should be reported
1293 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1294 (oldsha, newsha)
1295 """
1296 # TODO(jelmer): Support a include_trees option
1297 other_names = set(names)
1299 if tree is not None:
1300 for name, mode, sha in iter_tree_contents(object_store, tree):
1301 try:
1302 (other_sha, other_mode) = lookup_entry(name)
1303 except KeyError:
1304 # Was removed
1305 yield ((name, None), (mode, None), (sha, None))
1306 else:
1307 other_names.remove(name)
1308 if want_unchanged or other_sha != sha or other_mode != mode:
1309 yield ((name, name), (mode, other_mode), (sha, other_sha))
1311 # Mention added files
1312 for name in other_names:
1313 try:
1314 (other_sha, other_mode) = lookup_entry(name)
1315 except KeyError:
1316 pass
1317 else:
1318 yield ((None, name), (None, other_mode), (None, other_sha))
1321def index_entry_from_stat(
1322 stat_val: os.stat_result,
1323 hex_sha: bytes,
1324 mode: Optional[int] = None,
1325) -> IndexEntry:
1326 """Create a new index entry from a stat value.
1328 Args:
1329 stat_val: POSIX stat_result instance
1330 hex_sha: Hex sha of the object
1331 mode: Optional file mode, will be derived from stat if not provided
1332 """
1333 if mode is None:
1334 mode = cleanup_mode(stat_val.st_mode)
1336 return IndexEntry(
1337 ctime=stat_val.st_ctime,
1338 mtime=stat_val.st_mtime,
1339 dev=stat_val.st_dev,
1340 ino=stat_val.st_ino,
1341 mode=mode,
1342 uid=stat_val.st_uid,
1343 gid=stat_val.st_gid,
1344 size=stat_val.st_size,
1345 sha=hex_sha,
1346 flags=0,
1347 extended_flags=0,
1348 )
1351if sys.platform == "win32":
1352 # On Windows, creating symlinks either requires administrator privileges
1353 # or developer mode. Raise a more helpful error when we're unable to
1354 # create symlinks
1356 # https://github.com/jelmer/dulwich/issues/1005
1358 class WindowsSymlinkPermissionError(PermissionError):
1359 """Windows-specific error for symlink creation failures.
1361 This error is raised when symlink creation fails on Windows,
1362 typically due to lack of developer mode or administrator privileges.
1363 """
1365 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
1366 """Initialize WindowsSymlinkPermissionError."""
1367 super(PermissionError, self).__init__(
1368 errno,
1369 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1370 filename,
1371 )
1373 def symlink(
1374 src: Union[str, bytes],
1375 dst: Union[str, bytes],
1376 target_is_directory: bool = False,
1377 *,
1378 dir_fd: Optional[int] = None,
1379 ) -> None:
1380 """Create a symbolic link on Windows with better error handling.
1382 Args:
1383 src: Source path for the symlink
1384 dst: Destination path where symlink will be created
1385 target_is_directory: Whether the target is a directory
1386 dir_fd: Optional directory file descriptor
1388 Raises:
1389 WindowsSymlinkPermissionError: If symlink creation fails due to permissions
1390 """
1391 try:
1392 return os.symlink(
1393 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1394 )
1395 except PermissionError as e:
1396 raise WindowsSymlinkPermissionError(
1397 e.errno or 0, e.strerror or "", e.filename
1398 ) from e
1399else:
1400 symlink = os.symlink
1403def build_file_from_blob(
1404 blob: Blob,
1405 mode: int,
1406 target_path: bytes,
1407 *,
1408 honor_filemode: bool = True,
1409 tree_encoding: str = "utf-8",
1410 symlink_fn: Optional[
1411 Callable[[Union[str, bytes, os.PathLike], Union[str, bytes, os.PathLike]], None]
1412 ] = None,
1413) -> os.stat_result:
1414 """Build a file or symlink on disk based on a Git object.
1416 Args:
1417 blob: The git object
1418 mode: File mode
1419 target_path: Path to write to
1420 honor_filemode: An optional flag to honor core.filemode setting in
1421 config file, default is core.filemode=True, change executable bit
1422 tree_encoding: Encoding to use for tree contents
1423 symlink_fn: Function to use for creating symlinks
1424 Returns: stat object for the file
1425 """
1426 try:
1427 oldstat = os.lstat(target_path)
1428 except FileNotFoundError:
1429 oldstat = None
1430 contents = blob.as_raw_string()
1431 if stat.S_ISLNK(mode):
1432 if oldstat:
1433 _remove_file_with_readonly_handling(target_path)
1434 if sys.platform == "win32":
1435 # os.readlink on Python3 on Windows requires a unicode string.
1436 contents_str = contents.decode(tree_encoding)
1437 target_path_str = target_path.decode(tree_encoding)
1438 (symlink_fn or symlink)(contents_str, target_path_str)
1439 else:
1440 (symlink_fn or symlink)(contents, target_path)
1441 else:
1442 if oldstat is not None and oldstat.st_size == len(contents):
1443 with open(target_path, "rb") as f:
1444 if f.read() == contents:
1445 return oldstat
1447 with open(target_path, "wb") as f:
1448 # Write out file
1449 f.write(contents)
1451 if honor_filemode:
1452 os.chmod(target_path, mode)
1454 return os.lstat(target_path)
1457INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1460def _normalize_path_element_default(element: bytes) -> bytes:
1461 """Normalize path element for default case-insensitive comparison."""
1462 return element.lower()
1465def _normalize_path_element_ntfs(element: bytes) -> bytes:
1466 """Normalize path element for NTFS filesystem."""
1467 return element.rstrip(b". ").lower()
1470def _normalize_path_element_hfs(element: bytes) -> bytes:
1471 """Normalize path element for HFS+ filesystem."""
1472 import unicodedata
1474 # Decode to Unicode (let UnicodeDecodeError bubble up)
1475 element_str = element.decode("utf-8", errors="strict")
1477 # Remove HFS+ ignorable characters
1478 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1479 # Normalize to NFD
1480 normalized = unicodedata.normalize("NFD", filtered)
1481 return normalized.lower().encode("utf-8", errors="strict")
1484def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
1485 """Get the appropriate path element normalization function based on config.
1487 Args:
1488 config: Repository configuration object
1490 Returns:
1491 Function that normalizes path elements for the configured filesystem
1492 """
1493 import os
1494 import sys
1496 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1497 return _normalize_path_element_ntfs
1498 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1499 return _normalize_path_element_hfs
1500 else:
1501 return _normalize_path_element_default
1504def validate_path_element_default(element: bytes) -> bool:
1505 """Validate a path element using default rules.
1507 Args:
1508 element: Path element to validate
1510 Returns:
1511 True if path element is valid, False otherwise
1512 """
1513 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1516def validate_path_element_ntfs(element: bytes) -> bool:
1517 """Validate a path element using NTFS filesystem rules.
1519 Args:
1520 element: Path element to validate
1522 Returns:
1523 True if path element is valid for NTFS, False otherwise
1524 """
1525 normalized = _normalize_path_element_ntfs(element)
1526 if normalized in INVALID_DOTNAMES:
1527 return False
1528 if normalized == b"git~1":
1529 return False
1530 return True
1533# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1534HFS_IGNORABLE_CHARS = {
1535 0x200C, # ZERO WIDTH NON-JOINER
1536 0x200D, # ZERO WIDTH JOINER
1537 0x200E, # LEFT-TO-RIGHT MARK
1538 0x200F, # RIGHT-TO-LEFT MARK
1539 0x202A, # LEFT-TO-RIGHT EMBEDDING
1540 0x202B, # RIGHT-TO-LEFT EMBEDDING
1541 0x202C, # POP DIRECTIONAL FORMATTING
1542 0x202D, # LEFT-TO-RIGHT OVERRIDE
1543 0x202E, # RIGHT-TO-LEFT OVERRIDE
1544 0x206A, # INHIBIT SYMMETRIC SWAPPING
1545 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1546 0x206C, # INHIBIT ARABIC FORM SHAPING
1547 0x206D, # ACTIVATE ARABIC FORM SHAPING
1548 0x206E, # NATIONAL DIGIT SHAPES
1549 0x206F, # NOMINAL DIGIT SHAPES
1550 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1551}
1554def validate_path_element_hfs(element: bytes) -> bool:
1555 """Validate path element for HFS+ filesystem.
1557 Equivalent to Git's is_hfs_dotgit and related checks.
1558 Uses NFD normalization and ignores HFS+ ignorable characters.
1559 """
1560 try:
1561 normalized = _normalize_path_element_hfs(element)
1562 except UnicodeDecodeError:
1563 # Malformed UTF-8 - be conservative and reject
1564 return False
1566 # Check against invalid names
1567 if normalized in INVALID_DOTNAMES:
1568 return False
1570 # Also check for 8.3 short name
1571 if normalized == b"git~1":
1572 return False
1574 return True
1577def validate_path(
1578 path: bytes,
1579 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1580) -> bool:
1581 """Default path validator that just checks for .git/."""
1582 parts = path.split(b"/")
1583 for p in parts:
1584 if not element_validator(p):
1585 return False
1586 else:
1587 return True
1590def build_index_from_tree(
1591 root_path: Union[str, bytes],
1592 index_path: Union[str, bytes],
1593 object_store: ObjectContainer,
1594 tree_id: bytes,
1595 honor_filemode: bool = True,
1596 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1597 symlink_fn: Optional[
1598 Callable[[Union[str, bytes, os.PathLike], Union[str, bytes, os.PathLike]], None]
1599 ] = None,
1600 blob_normalizer: Optional["BlobNormalizer"] = None,
1601 tree_encoding: str = "utf-8",
1602) -> None:
1603 """Generate and materialize index from a tree.
1605 Args:
1606 tree_id: Tree to materialize
1607 root_path: Target dir for materialized index files
1608 index_path: Target path for generated index
1609 object_store: Non-empty object store holding tree contents
1610 honor_filemode: An optional flag to honor core.filemode setting in
1611 config file, default is core.filemode=True, change executable bit
1612 validate_path_element: Function to validate path elements to check
1613 out; default just refuses .git and .. directories.
1614 symlink_fn: Function to use for creating symlinks
1615 blob_normalizer: An optional BlobNormalizer to use for converting line
1616 endings when writing blobs to the working directory.
1617 tree_encoding: Encoding used for tree paths (default: utf-8)
1619 Note: existing index is wiped and contents are not merged
1620 in a working dir. Suitable only for fresh clones.
1621 """
1622 index = Index(index_path, read=False)
1623 if not isinstance(root_path, bytes):
1624 root_path = os.fsencode(root_path)
1626 for entry in iter_tree_contents(object_store, tree_id):
1627 if not validate_path(entry.path, validate_path_element):
1628 continue
1629 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
1631 if not os.path.exists(os.path.dirname(full_path)):
1632 os.makedirs(os.path.dirname(full_path))
1634 # TODO(jelmer): Merge new index into working tree
1635 if S_ISGITLINK(entry.mode):
1636 if not os.path.isdir(full_path):
1637 os.mkdir(full_path)
1638 st = os.lstat(full_path)
1639 # TODO(jelmer): record and return submodule paths
1640 else:
1641 obj = object_store[entry.sha]
1642 assert isinstance(obj, Blob)
1643 # Apply blob normalization for checkout if normalizer is provided
1644 if blob_normalizer is not None:
1645 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1646 st = build_file_from_blob(
1647 obj,
1648 entry.mode,
1649 full_path,
1650 honor_filemode=honor_filemode,
1651 tree_encoding=tree_encoding,
1652 symlink_fn=symlink_fn,
1653 )
1655 # Add file to index
1656 if not honor_filemode or S_ISGITLINK(entry.mode):
1657 # we can not use tuple slicing to build a new tuple,
1658 # because on windows that will convert the times to
1659 # longs, which causes errors further along
1660 st_tuple = (
1661 entry.mode,
1662 st.st_ino,
1663 st.st_dev,
1664 st.st_nlink,
1665 st.st_uid,
1666 st.st_gid,
1667 st.st_size,
1668 st.st_atime,
1669 st.st_mtime,
1670 st.st_ctime,
1671 )
1672 st = st.__class__(st_tuple)
1673 # default to a stage 0 index entry (normal)
1674 # when reading from the filesystem
1675 index[entry.path] = index_entry_from_stat(st, entry.sha)
1677 index.write()
1680def blob_from_path_and_mode(
1681 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1682) -> Blob:
1683 """Create a blob from a path and a stat object.
1685 Args:
1686 fs_path: Full file system path to file
1687 mode: File mode
1688 tree_encoding: Encoding to use for tree contents
1689 Returns: A `Blob` object
1690 """
1691 assert isinstance(fs_path, bytes)
1692 blob = Blob()
1693 if stat.S_ISLNK(mode):
1694 if sys.platform == "win32":
1695 # os.readlink on Python3 on Windows requires a unicode string.
1696 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1697 else:
1698 blob.data = os.readlink(fs_path)
1699 else:
1700 with open(fs_path, "rb") as f:
1701 blob.data = f.read()
1702 return blob
1705def blob_from_path_and_stat(
1706 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
1707) -> Blob:
1708 """Create a blob from a path and a stat object.
1710 Args:
1711 fs_path: Full file system path to file
1712 st: A stat object
1713 tree_encoding: Encoding to use for tree contents
1714 Returns: A `Blob` object
1715 """
1716 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
1719def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
1720 """Read the head commit of a submodule.
1722 Args:
1723 path: path to the submodule
1724 Returns: HEAD sha, None if not a valid head/repository
1725 """
1726 from .errors import NotGitRepository
1727 from .repo import Repo
1729 # Repo currently expects a "str", so decode if necessary.
1730 # TODO(jelmer): Perhaps move this into Repo() ?
1731 if not isinstance(path, str):
1732 path = os.fsdecode(path)
1733 try:
1734 repo = Repo(path)
1735 except NotGitRepository:
1736 return None
1737 try:
1738 return repo.head()
1739 except KeyError:
1740 return None
1743def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
1744 """Check if a directory has changed after getting an error.
1746 When handling an error trying to create a blob from a path, call this
1747 function. It will check if the path is a directory. If it's a directory
1748 and a submodule, check the submodule head to see if it's has changed. If
1749 not, consider the file as changed as Git tracked a file and not a
1750 directory.
1752 Return true if the given path should be considered as changed and False
1753 otherwise or if the path is not a directory.
1754 """
1755 # This is actually a directory
1756 if os.path.exists(os.path.join(tree_path, b".git")):
1757 # Submodule
1758 head = read_submodule_head(tree_path)
1759 if entry.sha != head:
1760 return True
1761 else:
1762 # The file was changed to a directory, so consider it removed.
1763 return True
1765 return False
1768os_sep_bytes = os.sep.encode("ascii")
1771def _ensure_parent_dir_exists(full_path: bytes) -> None:
1772 """Ensure parent directory exists, checking no parent is a file."""
1773 parent_dir = os.path.dirname(full_path)
1774 if parent_dir and not os.path.exists(parent_dir):
1775 # Walk up the directory tree to find the first existing parent
1776 current = parent_dir
1777 parents_to_check: list[bytes] = []
1779 while current and not os.path.exists(current):
1780 parents_to_check.insert(0, current)
1781 new_parent = os.path.dirname(current)
1782 if new_parent == current:
1783 # Reached the root or can't go up further
1784 break
1785 current = new_parent
1787 # Check if the existing parent (if any) is a directory
1788 if current and os.path.exists(current) and not os.path.isdir(current):
1789 raise OSError(
1790 f"Cannot create directory, parent path is a file: {current!r}"
1791 )
1793 # Now check each parent we need to create isn't blocked by an existing file
1794 for parent_path in parents_to_check:
1795 if os.path.exists(parent_path) and not os.path.isdir(parent_path):
1796 raise OSError(
1797 f"Cannot create directory, parent path is a file: {parent_path!r}"
1798 )
1800 os.makedirs(parent_dir)
1803def _remove_file_with_readonly_handling(path: bytes) -> None:
1804 """Remove a file, handling read-only files on Windows.
1806 Args:
1807 path: Path to the file to remove
1808 """
1809 try:
1810 os.unlink(path)
1811 except PermissionError:
1812 # On Windows, remove read-only attribute and retry
1813 if sys.platform == "win32":
1814 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
1815 os.unlink(path)
1816 else:
1817 raise
1820def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
1821 """Remove empty parent directories up to stop_at."""
1822 parent = os.path.dirname(path)
1823 while parent and parent != stop_at:
1824 try:
1825 os.rmdir(parent)
1826 parent = os.path.dirname(parent)
1827 except FileNotFoundError:
1828 # Directory doesn't exist - stop trying
1829 break
1830 except OSError as e:
1831 if e.errno == errno.ENOTEMPTY:
1832 # Directory not empty - stop trying
1833 break
1834 raise
1837def _check_symlink_matches(
1838 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes
1839) -> bool:
1840 """Check if symlink target matches expected target.
1842 Returns True if symlink matches, False if it doesn't match.
1843 """
1844 try:
1845 current_target = os.readlink(full_path)
1846 blob_obj = repo_object_store[entry_sha]
1847 expected_target = blob_obj.as_raw_string()
1848 if isinstance(current_target, str):
1849 current_target = current_target.encode()
1850 return current_target == expected_target
1851 except FileNotFoundError:
1852 # Symlink doesn't exist
1853 return False
1854 except OSError as e:
1855 if e.errno == errno.EINVAL:
1856 # Not a symlink
1857 return False
1858 raise
1861def _check_file_matches(
1862 repo_object_store: "BaseObjectStore",
1863 full_path: bytes,
1864 entry_sha: bytes,
1865 entry_mode: int,
1866 current_stat: os.stat_result,
1867 honor_filemode: bool,
1868 blob_normalizer: Optional["BlobNormalizer"] = None,
1869 tree_path: Optional[bytes] = None,
1870) -> bool:
1871 """Check if a file on disk matches the expected git object.
1873 Returns True if file matches, False if it doesn't match.
1874 """
1875 # Check mode first (if honor_filemode is True)
1876 if honor_filemode:
1877 current_mode = stat.S_IMODE(current_stat.st_mode)
1878 expected_mode = stat.S_IMODE(entry_mode)
1880 # For regular files, only check the user executable bit, not group/other permissions
1881 # This matches Git's behavior where umask differences don't count as modifications
1882 if stat.S_ISREG(current_stat.st_mode):
1883 # Normalize regular file modes to ignore group/other write permissions
1884 current_mode_normalized = (
1885 current_mode & 0o755
1886 ) # Keep only user rwx and all read+execute
1887 expected_mode_normalized = expected_mode & 0o755
1889 # For Git compatibility, regular files should be either 644 or 755
1890 if expected_mode_normalized not in (0o644, 0o755):
1891 expected_mode_normalized = 0o644 # Default for regular files
1892 if current_mode_normalized not in (0o644, 0o755):
1893 # Determine if it should be executable based on user execute bit
1894 if current_mode & 0o100: # User execute bit is set
1895 current_mode_normalized = 0o755
1896 else:
1897 current_mode_normalized = 0o644
1899 if current_mode_normalized != expected_mode_normalized:
1900 return False
1901 else:
1902 # For non-regular files (symlinks, etc.), check mode exactly
1903 if current_mode != expected_mode:
1904 return False
1906 # If mode matches (or we don't care), check content via size first
1907 blob_obj = repo_object_store[entry_sha]
1908 if current_stat.st_size != blob_obj.raw_length():
1909 return False
1911 # Size matches, check actual content
1912 try:
1913 with open(full_path, "rb") as f:
1914 current_content = f.read()
1915 expected_content = blob_obj.as_raw_string()
1916 if blob_normalizer and tree_path is not None:
1917 assert isinstance(blob_obj, Blob)
1918 normalized_blob = blob_normalizer.checkout_normalize(
1919 blob_obj, tree_path
1920 )
1921 expected_content = normalized_blob.as_raw_string()
1922 return current_content == expected_content
1923 except (FileNotFoundError, PermissionError, IsADirectoryError):
1924 return False
1927def _transition_to_submodule(
1928 repo: "Repo",
1929 path: bytes,
1930 full_path: bytes,
1931 current_stat: Optional[os.stat_result],
1932 entry: IndexEntry,
1933 index: Index,
1934) -> None:
1935 """Transition any type to submodule."""
1936 from .submodule import ensure_submodule_placeholder
1938 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1939 # Already a directory, just ensure .git file exists
1940 ensure_submodule_placeholder(repo, path)
1941 else:
1942 # Remove whatever is there and create submodule
1943 if current_stat is not None:
1944 _remove_file_with_readonly_handling(full_path)
1945 ensure_submodule_placeholder(repo, path)
1947 st = os.lstat(full_path)
1948 index[path] = index_entry_from_stat(st, entry.sha)
1951def _transition_to_file(
1952 object_store: "BaseObjectStore",
1953 path: bytes,
1954 full_path: bytes,
1955 current_stat: Optional[os.stat_result],
1956 entry: IndexEntry,
1957 index: Index,
1958 honor_filemode: bool,
1959 symlink_fn: Optional[
1960 Callable[[Union[str, bytes, os.PathLike], Union[str, bytes, os.PathLike]], None]
1961 ],
1962 blob_normalizer: Optional["BlobNormalizer"],
1963 tree_encoding: str = "utf-8",
1964) -> None:
1965 """Transition any type to regular file or symlink."""
1966 # Check if we need to update
1967 if (
1968 current_stat is not None
1969 and stat.S_ISREG(current_stat.st_mode)
1970 and not stat.S_ISLNK(entry.mode)
1971 ):
1972 # File to file - check if update needed
1973 file_matches = _check_file_matches(
1974 object_store,
1975 full_path,
1976 entry.sha,
1977 entry.mode,
1978 current_stat,
1979 honor_filemode,
1980 blob_normalizer,
1981 path,
1982 )
1983 needs_update = not file_matches
1984 elif (
1985 current_stat is not None
1986 and stat.S_ISLNK(current_stat.st_mode)
1987 and stat.S_ISLNK(entry.mode)
1988 ):
1989 # Symlink to symlink - check if update needed
1990 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
1991 needs_update = not symlink_matches
1992 else:
1993 needs_update = True
1995 if not needs_update:
1996 # Just update index - current_stat should always be valid here since we're not updating
1997 assert current_stat is not None
1998 index[path] = index_entry_from_stat(current_stat, entry.sha)
1999 return
2001 # Remove existing entry if needed
2002 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
2003 # Remove directory
2004 dir_contents = set(os.listdir(full_path))
2005 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2007 if git_file_name in dir_contents:
2008 if dir_contents != {git_file_name}:
2009 raise IsADirectoryError(
2010 f"Cannot replace submodule with untracked files: {full_path!r}"
2011 )
2012 shutil.rmtree(full_path)
2013 else:
2014 try:
2015 os.rmdir(full_path)
2016 except OSError as e:
2017 if e.errno == errno.ENOTEMPTY:
2018 raise IsADirectoryError(
2019 f"Cannot replace non-empty directory with file: {full_path!r}"
2020 )
2021 raise
2022 elif current_stat is not None:
2023 _remove_file_with_readonly_handling(full_path)
2025 # Ensure parent directory exists
2026 _ensure_parent_dir_exists(full_path)
2028 # Write the file
2029 blob_obj = object_store[entry.sha]
2030 assert isinstance(blob_obj, Blob)
2031 if blob_normalizer:
2032 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
2033 st = build_file_from_blob(
2034 blob_obj,
2035 entry.mode,
2036 full_path,
2037 honor_filemode=honor_filemode,
2038 tree_encoding=tree_encoding,
2039 symlink_fn=symlink_fn,
2040 )
2041 index[path] = index_entry_from_stat(st, entry.sha)
2044def _transition_to_absent(
2045 repo: "Repo",
2046 path: bytes,
2047 full_path: bytes,
2048 current_stat: Optional[os.stat_result],
2049 index: Index,
2050) -> None:
2051 """Remove any type of entry."""
2052 if current_stat is None:
2053 return
2055 if stat.S_ISDIR(current_stat.st_mode):
2056 # Check if it's a submodule directory
2057 dir_contents = set(os.listdir(full_path))
2058 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
2060 if git_file_name in dir_contents and dir_contents == {git_file_name}:
2061 shutil.rmtree(full_path)
2062 else:
2063 try:
2064 os.rmdir(full_path)
2065 except OSError as e:
2066 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
2067 raise
2068 else:
2069 _remove_file_with_readonly_handling(full_path)
2071 try:
2072 del index[path]
2073 except KeyError:
2074 pass
2076 # Try to remove empty parent directories
2077 _remove_empty_parents(
2078 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2079 )
2082def detect_case_only_renames(
2083 changes: list["TreeChange"],
2084 config: "Config",
2085) -> list["TreeChange"]:
2086 """Detect and transform case-only renames in a list of tree changes.
2088 This function identifies file renames that only differ in case (e.g.,
2089 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
2090 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
2091 based on the repository configuration.
2093 Args:
2094 changes: List of TreeChange objects representing file changes
2095 config: Repository configuration object
2097 Returns:
2098 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
2099 """
2100 from .diff_tree import (
2101 CHANGE_ADD,
2102 CHANGE_COPY,
2103 CHANGE_DELETE,
2104 CHANGE_MODIFY,
2105 CHANGE_RENAME,
2106 TreeChange,
2107 )
2109 # Build dictionaries of old and new paths with their normalized forms
2110 old_paths_normalized = {}
2111 new_paths_normalized = {}
2112 old_changes = {} # Map from old path to change object
2113 new_changes = {} # Map from new path to change object
2115 # Get the appropriate normalizer based on config
2116 normalize_func = get_path_element_normalizer(config)
2118 def normalize_path(path: bytes) -> bytes:
2119 """Normalize entire path using element normalization."""
2120 return b"/".join(normalize_func(part) for part in path.split(b"/"))
2122 # Pre-normalize all paths once to avoid repeated normalization
2123 for change in changes:
2124 if change.type == CHANGE_DELETE and change.old:
2125 try:
2126 normalized = normalize_path(change.old.path)
2127 except UnicodeDecodeError:
2128 import logging
2130 logging.warning(
2131 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2132 change.old.path,
2133 )
2134 else:
2135 old_paths_normalized[normalized] = change.old.path
2136 old_changes[change.old.path] = change
2137 elif change.type == CHANGE_RENAME and change.old:
2138 # Treat RENAME as DELETE + ADD for case-only detection
2139 try:
2140 normalized = normalize_path(change.old.path)
2141 except UnicodeDecodeError:
2142 import logging
2144 logging.warning(
2145 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2146 change.old.path,
2147 )
2148 else:
2149 old_paths_normalized[normalized] = change.old.path
2150 old_changes[change.old.path] = change
2152 if (
2153 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
2154 and change.new
2155 ):
2156 try:
2157 normalized = normalize_path(change.new.path)
2158 except UnicodeDecodeError:
2159 import logging
2161 logging.warning(
2162 "Skipping case-only rename detection for path with invalid UTF-8: %r",
2163 change.new.path,
2164 )
2165 else:
2166 new_paths_normalized[normalized] = change.new.path
2167 new_changes[change.new.path] = change
2169 # Find case-only renames and transform changes
2170 case_only_renames = set()
2171 new_rename_changes = []
2173 for norm_path, old_path in old_paths_normalized.items():
2174 if norm_path in new_paths_normalized:
2175 new_path = new_paths_normalized[norm_path]
2176 if old_path != new_path:
2177 # Found a case-only rename
2178 old_change = old_changes[old_path]
2179 new_change = new_changes[new_path]
2181 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
2182 if new_change.type == CHANGE_ADD:
2183 # Simple case: DELETE + ADD becomes RENAME
2184 rename_change = TreeChange(
2185 CHANGE_RENAME, old_change.old, new_change.new
2186 )
2187 else:
2188 # Complex case: DELETE + MODIFY becomes RENAME
2189 # Use the old file from DELETE and new file from MODIFY
2190 rename_change = TreeChange(
2191 CHANGE_RENAME, old_change.old, new_change.new
2192 )
2194 new_rename_changes.append(rename_change)
2196 # Mark the old changes for removal
2197 case_only_renames.add(old_change)
2198 case_only_renames.add(new_change)
2200 # Return new list with original ADD/DELETE changes replaced by renames
2201 result = [change for change in changes if change not in case_only_renames]
2202 result.extend(new_rename_changes)
2203 return result
2206def update_working_tree(
2207 repo: "Repo",
2208 old_tree_id: Optional[bytes],
2209 new_tree_id: bytes,
2210 change_iterator: Iterator["TreeChange"],
2211 honor_filemode: bool = True,
2212 validate_path_element: Optional[Callable[[bytes], bool]] = None,
2213 symlink_fn: Optional[
2214 Callable[[Union[str, bytes, os.PathLike], Union[str, bytes, os.PathLike]], None]
2215 ] = None,
2216 force_remove_untracked: bool = False,
2217 blob_normalizer: Optional["BlobNormalizer"] = None,
2218 tree_encoding: str = "utf-8",
2219 allow_overwrite_modified: bool = False,
2220) -> None:
2221 """Update the working tree and index to match a new tree.
2223 This function handles:
2224 - Adding new files
2225 - Updating modified files
2226 - Removing deleted files
2227 - Cleaning up empty directories
2229 Args:
2230 repo: Repository object
2231 old_tree_id: SHA of the tree before the update
2232 new_tree_id: SHA of the tree to update to
2233 change_iterator: Iterator of TreeChange objects to apply
2234 honor_filemode: An optional flag to honor core.filemode setting
2235 validate_path_element: Function to validate path elements to check out
2236 symlink_fn: Function to use for creating symlinks
2237 force_remove_untracked: If True, remove files that exist in working
2238 directory but not in target tree, even if old_tree_id is None
2239 blob_normalizer: An optional BlobNormalizer to use for converting line
2240 endings when writing blobs to the working directory.
2241 tree_encoding: Encoding used for tree paths (default: utf-8)
2242 allow_overwrite_modified: If False, raise an error when attempting to
2243 overwrite files that have been modified compared to old_tree_id
2244 """
2245 if validate_path_element is None:
2246 validate_path_element = validate_path_element_default
2248 from .diff_tree import (
2249 CHANGE_ADD,
2250 CHANGE_COPY,
2251 CHANGE_DELETE,
2252 CHANGE_MODIFY,
2253 CHANGE_RENAME,
2254 CHANGE_UNCHANGED,
2255 )
2257 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2258 index = repo.open_index()
2260 # Convert iterator to list since we need multiple passes
2261 changes = list(change_iterator)
2263 # Transform case-only renames on case-insensitive filesystems
2264 import platform
2266 default_ignore_case = platform.system() in ("Windows", "Darwin")
2267 config = repo.get_config()
2268 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2270 if ignore_case:
2271 config = repo.get_config()
2272 changes = detect_case_only_renames(changes, config)
2274 # Check for path conflicts where files need to become directories
2275 paths_becoming_dirs = set()
2276 for change in changes:
2277 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2278 path = change.new.path
2279 if b"/" in path: # This is a file inside a directory
2280 # Check if any parent path exists as a file in the old tree or changes
2281 parts = path.split(b"/")
2282 for i in range(1, len(parts)):
2283 parent = b"/".join(parts[:i])
2284 # See if this parent path is being deleted (was a file, becoming a dir)
2285 for other_change in changes:
2286 if (
2287 other_change.type == CHANGE_DELETE
2288 and other_change.old
2289 and other_change.old.path == parent
2290 ):
2291 paths_becoming_dirs.add(parent)
2293 # Check if any path that needs to become a directory has been modified
2294 for path in paths_becoming_dirs:
2295 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2296 try:
2297 current_stat = os.lstat(full_path)
2298 except FileNotFoundError:
2299 continue # File doesn't exist, nothing to check
2300 except OSError as e:
2301 raise OSError(
2302 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2303 ) from e
2305 if stat.S_ISREG(current_stat.st_mode):
2306 # Find the old entry for this path
2307 old_change = None
2308 for change in changes:
2309 if (
2310 change.type == CHANGE_DELETE
2311 and change.old
2312 and change.old.path == path
2313 ):
2314 old_change = change
2315 break
2317 if old_change:
2318 # Check if file has been modified
2319 file_matches = _check_file_matches(
2320 repo.object_store,
2321 full_path,
2322 old_change.old.sha,
2323 old_change.old.mode,
2324 current_stat,
2325 honor_filemode,
2326 blob_normalizer,
2327 path,
2328 )
2329 if not file_matches:
2330 raise OSError(
2331 f"Cannot replace modified file with directory: {path!r}"
2332 )
2334 # Check for uncommitted modifications before making any changes
2335 if not allow_overwrite_modified and old_tree_id:
2336 for change in changes:
2337 # Only check files that are being modified or deleted
2338 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2339 path = change.old.path
2340 if path.startswith(b".git") or not validate_path(
2341 path, validate_path_element
2342 ):
2343 continue
2345 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2346 try:
2347 current_stat = os.lstat(full_path)
2348 except FileNotFoundError:
2349 continue # File doesn't exist, nothing to check
2350 except OSError as e:
2351 raise OSError(
2352 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2353 ) from e
2355 if stat.S_ISREG(current_stat.st_mode):
2356 # Check if working tree file differs from old tree
2357 file_matches = _check_file_matches(
2358 repo.object_store,
2359 full_path,
2360 change.old.sha,
2361 change.old.mode,
2362 current_stat,
2363 honor_filemode,
2364 blob_normalizer,
2365 path,
2366 )
2367 if not file_matches:
2368 from .errors import WorkingTreeModifiedError
2370 raise WorkingTreeModifiedError(
2371 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2372 f"would be overwritten by checkout. "
2373 f"Please commit your changes or stash them before you switch branches."
2374 )
2376 # Apply the changes
2377 for change in changes:
2378 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2379 # Remove file/directory
2380 path = change.old.path
2381 if path.startswith(b".git") or not validate_path(
2382 path, validate_path_element
2383 ):
2384 continue
2386 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2387 try:
2388 delete_stat: Optional[os.stat_result] = os.lstat(full_path)
2389 except FileNotFoundError:
2390 delete_stat = None
2391 except OSError as e:
2392 raise OSError(
2393 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2394 ) from e
2396 _transition_to_absent(repo, path, full_path, delete_stat, index)
2398 if change.type in (
2399 CHANGE_ADD,
2400 CHANGE_MODIFY,
2401 CHANGE_UNCHANGED,
2402 CHANGE_COPY,
2403 CHANGE_RENAME,
2404 ):
2405 # Add or modify file
2406 path = change.new.path
2407 if path.startswith(b".git") or not validate_path(
2408 path, validate_path_element
2409 ):
2410 continue
2412 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2413 try:
2414 modify_stat: Optional[os.stat_result] = os.lstat(full_path)
2415 except FileNotFoundError:
2416 modify_stat = None
2417 except OSError as e:
2418 raise OSError(
2419 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2420 ) from e
2422 if S_ISGITLINK(change.new.mode):
2423 _transition_to_submodule(
2424 repo, path, full_path, modify_stat, change.new, index
2425 )
2426 else:
2427 _transition_to_file(
2428 repo.object_store,
2429 path,
2430 full_path,
2431 modify_stat,
2432 change.new,
2433 index,
2434 honor_filemode,
2435 symlink_fn,
2436 blob_normalizer,
2437 tree_encoding,
2438 )
2440 index.write()
2443def get_unstaged_changes(
2444 index: Index,
2445 root_path: Union[str, bytes],
2446 filter_blob_callback: Optional[Callable] = None,
2447) -> Generator[bytes, None, None]:
2448 """Walk through an index and check for differences against working tree.
2450 Args:
2451 index: index to check
2452 root_path: path in which to find files
2453 filter_blob_callback: Optional callback to filter blobs
2454 Returns: iterator over paths with unstaged changes
2455 """
2456 # For each entry in the index check the sha1 & ensure not staged
2457 if not isinstance(root_path, bytes):
2458 root_path = os.fsencode(root_path)
2460 for tree_path, entry in index.iteritems():
2461 full_path = _tree_to_fs_path(root_path, tree_path)
2462 if isinstance(entry, ConflictedIndexEntry):
2463 # Conflicted files are always unstaged
2464 yield tree_path
2465 continue
2467 try:
2468 st = os.lstat(full_path)
2469 if stat.S_ISDIR(st.st_mode):
2470 if _has_directory_changed(tree_path, entry):
2471 yield tree_path
2472 continue
2474 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2475 continue
2477 blob = blob_from_path_and_stat(full_path, st)
2479 if filter_blob_callback is not None:
2480 blob = filter_blob_callback(blob, tree_path)
2481 except FileNotFoundError:
2482 # The file was removed, so we assume that counts as
2483 # different from whatever file used to exist.
2484 yield tree_path
2485 else:
2486 if blob.id != entry.sha:
2487 yield tree_path
2490def _tree_to_fs_path(
2491 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
2492) -> bytes:
2493 """Convert a git tree path to a file system path.
2495 Args:
2496 root_path: Root filesystem path
2497 tree_path: Git tree path as bytes (encoded with tree_encoding)
2498 tree_encoding: Encoding used for tree paths (default: utf-8)
2500 Returns: File system path.
2501 """
2502 assert isinstance(tree_path, bytes)
2503 if os_sep_bytes != b"/":
2504 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
2505 else:
2506 sep_corrected_path = tree_path
2508 # On Windows, we need to handle tree path encoding properly
2509 if sys.platform == "win32":
2510 # Decode from tree encoding, then re-encode for filesystem
2511 try:
2512 tree_path_str = sep_corrected_path.decode(tree_encoding)
2513 sep_corrected_path = os.fsencode(tree_path_str)
2514 except UnicodeDecodeError:
2515 # If decoding fails, use the original bytes
2516 pass
2518 return os.path.join(root_path, sep_corrected_path)
2521def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes:
2522 """Convert a file system path to a git tree path.
2524 Args:
2525 fs_path: File system path.
2526 tree_encoding: Encoding to use for tree paths (default: utf-8)
2528 Returns: Git tree path as bytes (encoded with tree_encoding)
2529 """
2530 if not isinstance(fs_path, bytes):
2531 fs_path_bytes = os.fsencode(fs_path)
2532 else:
2533 fs_path_bytes = fs_path
2535 # On Windows, we need to ensure tree paths are properly encoded
2536 if sys.platform == "win32":
2537 try:
2538 # Decode from filesystem encoding, then re-encode with tree encoding
2539 fs_path_str = os.fsdecode(fs_path_bytes)
2540 fs_path_bytes = fs_path_str.encode(tree_encoding)
2541 except UnicodeDecodeError:
2542 # If filesystem decoding fails, use the original bytes
2543 pass
2545 if os_sep_bytes != b"/":
2546 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2547 else:
2548 tree_path = fs_path_bytes
2549 return tree_path
2552def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
2553 """Create an index entry for a directory.
2555 This is only used for submodules (directories containing .git).
2557 Args:
2558 st: Stat result for the directory
2559 path: Path to the directory
2561 Returns:
2562 IndexEntry for a submodule, or None if not a submodule
2563 """
2564 if os.path.exists(os.path.join(path, b".git")):
2565 head = read_submodule_head(path)
2566 if head is None:
2567 return None
2568 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2569 return None
2572def index_entry_from_path(
2573 path: bytes, object_store: Optional[ObjectContainer] = None
2574) -> Optional[IndexEntry]:
2575 """Create an index from a filesystem path.
2577 This returns an index value for files, symlinks
2578 and tree references. for directories and
2579 non-existent files it returns None
2581 Args:
2582 path: Path to create an index entry for
2583 object_store: Optional object store to
2584 save new blobs in
2585 Returns: An index entry; None for directories
2586 """
2587 assert isinstance(path, bytes)
2588 st = os.lstat(path)
2589 if stat.S_ISDIR(st.st_mode):
2590 return index_entry_from_directory(st, path)
2592 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
2593 blob = blob_from_path_and_stat(path, st)
2594 if object_store is not None:
2595 object_store.add_object(blob)
2596 return index_entry_from_stat(st, blob.id)
2598 return None
2601def iter_fresh_entries(
2602 paths: Iterable[bytes],
2603 root_path: bytes,
2604 object_store: Optional[ObjectContainer] = None,
2605) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
2606 """Iterate over current versions of index entries on disk.
2608 Args:
2609 paths: Paths to iterate over
2610 root_path: Root path to access from
2611 object_store: Optional store to save new blobs in
2612 Returns: Iterator over path, index_entry
2613 """
2614 for path in paths:
2615 p = _tree_to_fs_path(root_path, path)
2616 try:
2617 entry = index_entry_from_path(p, object_store=object_store)
2618 except (FileNotFoundError, IsADirectoryError):
2619 entry = None
2620 yield path, entry
2623def iter_fresh_objects(
2624 paths: Iterable[bytes],
2625 root_path: bytes,
2626 include_deleted: bool = False,
2627 object_store: Optional[ObjectContainer] = None,
2628) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
2629 """Iterate over versions of objects on disk referenced by index.
2631 Args:
2632 paths: Paths to check
2633 root_path: Root path to access from
2634 include_deleted: Include deleted entries with sha and
2635 mode set to None
2636 object_store: Optional object store to report new items to
2637 Returns: Iterator over path, sha, mode
2638 """
2639 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
2640 if entry is None:
2641 if include_deleted:
2642 yield path, None, None
2643 else:
2644 yield path, entry.sha, cleanup_mode(entry.mode)
2647def refresh_index(index: Index, root_path: bytes) -> None:
2648 """Refresh the contents of an index.
2650 This is the equivalent to running 'git commit -a'.
2652 Args:
2653 index: Index to update
2654 root_path: Root filesystem path
2655 """
2656 for path, entry in iter_fresh_entries(index, root_path):
2657 if entry:
2658 index[path] = entry
2661class locked_index:
2662 """Lock the index while making modifications.
2664 Works as a context manager.
2665 """
2667 _file: "_GitFile"
2669 def __init__(self, path: Union[bytes, str]) -> None:
2670 """Initialize locked_index."""
2671 self._path = path
2673 def __enter__(self) -> Index:
2674 """Enter context manager and lock index."""
2675 f = GitFile(self._path, "wb")
2676 assert isinstance(f, _GitFile) # GitFile in write mode always returns _GitFile
2677 self._file = f
2678 self._index = Index(self._path)
2679 return self._index
2681 def __exit__(
2682 self,
2683 exc_type: Optional[type],
2684 exc_value: Optional[BaseException],
2685 traceback: Optional[types.TracebackType],
2686 ) -> None:
2687 """Exit context manager and unlock index."""
2688 if exc_type is not None:
2689 self._file.abort()
2690 return
2691 try:
2692 f = SHA1Writer(self._file)
2693 write_index_dict(f, self._index._byname)
2694 except BaseException:
2695 self._file.abort()
2696 else:
2697 f.close()