Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import Generator, Iterable, Iterator
32from dataclasses import dataclass
33from enum import Enum
34from typing import (
35 TYPE_CHECKING,
36 Any,
37 BinaryIO,
38 Callable,
39 Optional,
40 Union,
41 cast,
42)
44if TYPE_CHECKING:
45 from .file import _GitFile
46 from .line_ending import BlobNormalizer
47 from .repo import Repo
49from .file import GitFile
50from .object_store import iter_tree_contents
51from .objects import (
52 S_IFGITLINK,
53 S_ISGITLINK,
54 Blob,
55 ObjectID,
56 Tree,
57 hex_to_sha,
58 sha_to_hex,
59)
60from .pack import ObjectContainer, SHA1Reader, SHA1Writer
62# 2-bit stage (during merge)
63FLAG_STAGEMASK = 0x3000
64FLAG_STAGESHIFT = 12
65FLAG_NAMEMASK = 0x0FFF
67# assume-valid
68FLAG_VALID = 0x8000
70# extended flag (must be zero in version 2)
71FLAG_EXTENDED = 0x4000
73# used by sparse checkout
74EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
76# used by "git add -N"
77EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
79DEFAULT_VERSION = 2
81# Index extension signatures
82TREE_EXTENSION = b"TREE"
83REUC_EXTENSION = b"REUC"
84UNTR_EXTENSION = b"UNTR"
85EOIE_EXTENSION = b"EOIE"
86IEOT_EXTENSION = b"IEOT"
89def _encode_varint(value: int) -> bytes:
90 """Encode an integer using variable-width encoding.
92 Same format as used for OFS_DELTA pack entries and index v4 path compression.
93 Uses 7 bits per byte, with the high bit indicating continuation.
95 Args:
96 value: Integer to encode
97 Returns:
98 Encoded bytes
99 """
100 if value == 0:
101 return b"\x00"
103 result = []
104 while value > 0:
105 byte = value & 0x7F # Take lower 7 bits
106 value >>= 7
107 if value > 0:
108 byte |= 0x80 # Set continuation bit
109 result.append(byte)
111 return bytes(result)
114def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
115 """Decode a variable-width encoded integer.
117 Args:
118 data: Bytes to decode from
119 offset: Starting offset in data
120 Returns:
121 tuple of (decoded_value, new_offset)
122 """
123 value = 0
124 shift = 0
125 pos = offset
127 while pos < len(data):
128 byte = data[pos]
129 pos += 1
130 value |= (byte & 0x7F) << shift
131 shift += 7
132 if not (byte & 0x80): # No continuation bit
133 break
135 return value, pos
138def _compress_path(path: bytes, previous_path: bytes) -> bytes:
139 """Compress a path relative to the previous path for index version 4.
141 Args:
142 path: Path to compress
143 previous_path: Previous path for comparison
144 Returns:
145 Compressed path data (varint prefix_len + suffix)
146 """
147 # Find the common prefix length
148 common_len = 0
149 min_len = min(len(path), len(previous_path))
151 for i in range(min_len):
152 if path[i] == previous_path[i]:
153 common_len += 1
154 else:
155 break
157 # The number of bytes to remove from the end of previous_path
158 # to get the common prefix
159 remove_len = len(previous_path) - common_len
161 # The suffix to append
162 suffix = path[common_len:]
164 # Encode: varint(remove_len) + suffix + NUL
165 return _encode_varint(remove_len) + suffix + b"\x00"
168def _decompress_path(
169 data: bytes, offset: int, previous_path: bytes
170) -> tuple[bytes, int]:
171 """Decompress a path from index version 4 compressed format.
173 Args:
174 data: Raw data containing compressed path
175 offset: Starting offset in data
176 previous_path: Previous path for decompression
177 Returns:
178 tuple of (decompressed_path, new_offset)
179 """
180 # Decode the number of bytes to remove from previous path
181 remove_len, new_offset = _decode_varint(data, offset)
183 # Find the NUL terminator for the suffix
184 suffix_start = new_offset
185 suffix_end = suffix_start
186 while suffix_end < len(data) and data[suffix_end] != 0:
187 suffix_end += 1
189 if suffix_end >= len(data):
190 raise ValueError("Unterminated path suffix in compressed entry")
192 suffix = data[suffix_start:suffix_end]
193 new_offset = suffix_end + 1 # Skip the NUL terminator
195 # Reconstruct the path
196 if remove_len > len(previous_path):
197 raise ValueError(
198 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
199 )
201 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
202 path = prefix + suffix
204 return path, new_offset
207def _decompress_path_from_stream(
208 f: BinaryIO, previous_path: bytes
209) -> tuple[bytes, int]:
210 """Decompress a path from index version 4 compressed format, reading from stream.
212 Args:
213 f: File-like object to read from
214 previous_path: Previous path for decompression
215 Returns:
216 tuple of (decompressed_path, bytes_consumed)
217 """
218 # Decode the varint for remove_len by reading byte by byte
219 remove_len = 0
220 shift = 0
221 bytes_consumed = 0
223 while True:
224 byte_data = f.read(1)
225 if not byte_data:
226 raise ValueError("Unexpected end of file while reading varint")
227 byte = byte_data[0]
228 bytes_consumed += 1
229 remove_len |= (byte & 0x7F) << shift
230 shift += 7
231 if not (byte & 0x80): # No continuation bit
232 break
234 # Read the suffix until NUL terminator
235 suffix = b""
236 while True:
237 byte_data = f.read(1)
238 if not byte_data:
239 raise ValueError("Unexpected end of file while reading path suffix")
240 byte = byte_data[0]
241 bytes_consumed += 1
242 if byte == 0: # NUL terminator
243 break
244 suffix += bytes([byte])
246 # Reconstruct the path
247 if remove_len > len(previous_path):
248 raise ValueError(
249 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
250 )
252 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
253 path = prefix + suffix
255 return path, bytes_consumed
258class Stage(Enum):
259 NORMAL = 0
260 MERGE_CONFLICT_ANCESTOR = 1
261 MERGE_CONFLICT_THIS = 2
262 MERGE_CONFLICT_OTHER = 3
265@dataclass
266class SerializedIndexEntry:
267 name: bytes
268 ctime: Union[int, float, tuple[int, int]]
269 mtime: Union[int, float, tuple[int, int]]
270 dev: int
271 ino: int
272 mode: int
273 uid: int
274 gid: int
275 size: int
276 sha: bytes
277 flags: int
278 extended_flags: int
280 def stage(self) -> Stage:
281 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
284@dataclass
285class IndexExtension:
286 """Base class for index extensions."""
288 signature: bytes
289 data: bytes
291 @classmethod
292 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
293 """Create an extension from raw data.
295 Args:
296 signature: 4-byte extension signature
297 data: Extension data
298 Returns:
299 Parsed extension object
300 """
301 if signature == TREE_EXTENSION:
302 return TreeExtension.from_bytes(data)
303 elif signature == REUC_EXTENSION:
304 return ResolveUndoExtension.from_bytes(data)
305 elif signature == UNTR_EXTENSION:
306 return UntrackedExtension.from_bytes(data)
307 else:
308 # Unknown extension - just store raw data
309 return cls(signature, data)
311 def to_bytes(self) -> bytes:
312 """Serialize extension to bytes."""
313 return self.data
316class TreeExtension(IndexExtension):
317 """Tree cache extension."""
319 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
320 self.entries = entries
321 super().__init__(TREE_EXTENSION, b"")
323 @classmethod
324 def from_bytes(cls, data: bytes) -> "TreeExtension":
325 # TODO: Implement tree cache parsing
326 return cls([])
328 def to_bytes(self) -> bytes:
329 # TODO: Implement tree cache serialization
330 return b""
333class ResolveUndoExtension(IndexExtension):
334 """Resolve undo extension for recording merge conflicts."""
336 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
337 self.entries = entries
338 super().__init__(REUC_EXTENSION, b"")
340 @classmethod
341 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
342 # TODO: Implement resolve undo parsing
343 return cls([])
345 def to_bytes(self) -> bytes:
346 # TODO: Implement resolve undo serialization
347 return b""
350class UntrackedExtension(IndexExtension):
351 """Untracked cache extension."""
353 def __init__(self, data: bytes) -> None:
354 super().__init__(UNTR_EXTENSION, data)
356 @classmethod
357 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
358 return cls(data)
361@dataclass
362class IndexEntry:
363 ctime: Union[int, float, tuple[int, int]]
364 mtime: Union[int, float, tuple[int, int]]
365 dev: int
366 ino: int
367 mode: int
368 uid: int
369 gid: int
370 size: int
371 sha: bytes
372 flags: int = 0
373 extended_flags: int = 0
375 @classmethod
376 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
377 return cls(
378 ctime=serialized.ctime,
379 mtime=serialized.mtime,
380 dev=serialized.dev,
381 ino=serialized.ino,
382 mode=serialized.mode,
383 uid=serialized.uid,
384 gid=serialized.gid,
385 size=serialized.size,
386 sha=serialized.sha,
387 flags=serialized.flags,
388 extended_flags=serialized.extended_flags,
389 )
391 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
392 # Clear out any existing stage bits, then set them from the Stage.
393 new_flags = self.flags & ~FLAG_STAGEMASK
394 new_flags |= stage.value << FLAG_STAGESHIFT
395 return SerializedIndexEntry(
396 name=name,
397 ctime=self.ctime,
398 mtime=self.mtime,
399 dev=self.dev,
400 ino=self.ino,
401 mode=self.mode,
402 uid=self.uid,
403 gid=self.gid,
404 size=self.size,
405 sha=self.sha,
406 flags=new_flags,
407 extended_flags=self.extended_flags,
408 )
410 def stage(self) -> Stage:
411 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
413 @property
414 def skip_worktree(self) -> bool:
415 """Return True if the skip-worktree bit is set in extended_flags."""
416 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
418 def set_skip_worktree(self, skip: bool = True) -> None:
419 """Helper method to set or clear the skip-worktree bit in extended_flags.
420 Also sets FLAG_EXTENDED in self.flags if needed.
421 """
422 if skip:
423 # Turn on the skip-worktree bit
424 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
425 # Also ensure the main 'extended' bit is set in flags
426 self.flags |= FLAG_EXTENDED
427 else:
428 # Turn off the skip-worktree bit
429 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
430 # Optionally unset the main extended bit if no extended flags remain
431 if self.extended_flags == 0:
432 self.flags &= ~FLAG_EXTENDED
435class ConflictedIndexEntry:
436 """Index entry that represents a conflict."""
438 ancestor: Optional[IndexEntry]
439 this: Optional[IndexEntry]
440 other: Optional[IndexEntry]
442 def __init__(
443 self,
444 ancestor: Optional[IndexEntry] = None,
445 this: Optional[IndexEntry] = None,
446 other: Optional[IndexEntry] = None,
447 ) -> None:
448 self.ancestor = ancestor
449 self.this = this
450 self.other = other
453class UnmergedEntries(Exception):
454 """Unmerged entries exist in the index."""
457def pathsplit(path: bytes) -> tuple[bytes, bytes]:
458 """Split a /-delimited path into a directory part and a basename.
460 Args:
461 path: The path to split.
463 Returns:
464 Tuple with directory name and basename
465 """
466 try:
467 (dirname, basename) = path.rsplit(b"/", 1)
468 except ValueError:
469 return (b"", path)
470 else:
471 return (dirname, basename)
474def pathjoin(*args: bytes) -> bytes:
475 """Join a /-delimited path."""
476 return b"/".join([p for p in args if p])
479def read_cache_time(f: BinaryIO) -> tuple[int, int]:
480 """Read a cache time.
482 Args:
483 f: File-like object to read from
484 Returns:
485 Tuple with seconds and nanoseconds
486 """
487 return struct.unpack(">LL", f.read(8))
490def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None:
491 """Write a cache time.
493 Args:
494 f: File-like object to write to
495 t: Time to write (as int, float or tuple with secs and nsecs)
496 """
497 if isinstance(t, int):
498 t = (t, 0)
499 elif isinstance(t, float):
500 (secs, nsecs) = divmod(t, 1.0)
501 t = (int(secs), int(nsecs * 1000000000))
502 elif not isinstance(t, tuple):
503 raise TypeError(t)
504 f.write(struct.pack(">LL", *t))
507def read_cache_entry(
508 f: BinaryIO, version: int, previous_path: bytes = b""
509) -> SerializedIndexEntry:
510 """Read an entry from a cache file.
512 Args:
513 f: File-like object to read from
514 version: Index version
515 previous_path: Previous entry's path (for version 4 compression)
516 """
517 beginoffset = f.tell()
518 ctime = read_cache_time(f)
519 mtime = read_cache_time(f)
520 (
521 dev,
522 ino,
523 mode,
524 uid,
525 gid,
526 size,
527 sha,
528 flags,
529 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
530 if flags & FLAG_EXTENDED:
531 if version < 3:
532 raise AssertionError("extended flag set in index with version < 3")
533 (extended_flags,) = struct.unpack(">H", f.read(2))
534 else:
535 extended_flags = 0
537 if version >= 4:
538 # Version 4: paths are always compressed (name_len should be 0)
539 name, consumed = _decompress_path_from_stream(f, previous_path)
540 else:
541 # Versions < 4: regular name reading
542 name = f.read(flags & FLAG_NAMEMASK)
544 # Padding:
545 if version < 4:
546 real_size = (f.tell() - beginoffset + 8) & ~7
547 f.read((beginoffset + real_size) - f.tell())
549 return SerializedIndexEntry(
550 name,
551 ctime,
552 mtime,
553 dev,
554 ino,
555 mode,
556 uid,
557 gid,
558 size,
559 sha_to_hex(sha),
560 flags & ~FLAG_NAMEMASK,
561 extended_flags,
562 )
565def write_cache_entry(
566 f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
567) -> None:
568 """Write an index entry to a file.
570 Args:
571 f: File object
572 entry: IndexEntry to write
573 version: Index format version
574 previous_path: Previous entry's path (for version 4 compression)
575 """
576 beginoffset = f.tell()
577 write_cache_time(f, entry.ctime)
578 write_cache_time(f, entry.mtime)
580 if version >= 4:
581 # Version 4: use compression but set name_len to actual filename length
582 # This matches how C Git implements index v4 flags
583 compressed_path = _compress_path(entry.name, previous_path)
584 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
585 else:
586 # Versions < 4: include actual name length
587 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
589 if entry.extended_flags:
590 flags |= FLAG_EXTENDED
591 if flags & FLAG_EXTENDED and version is not None and version < 3:
592 raise AssertionError("unable to use extended flags in version < 3")
594 f.write(
595 struct.pack(
596 b">LLLLLL20sH",
597 entry.dev & 0xFFFFFFFF,
598 entry.ino & 0xFFFFFFFF,
599 entry.mode,
600 entry.uid,
601 entry.gid,
602 entry.size,
603 hex_to_sha(entry.sha),
604 flags,
605 )
606 )
607 if flags & FLAG_EXTENDED:
608 f.write(struct.pack(b">H", entry.extended_flags))
610 if version >= 4:
611 # Version 4: always write compressed path
612 f.write(compressed_path)
613 else:
614 # Versions < 4: write regular path and padding
615 f.write(entry.name)
616 real_size = (f.tell() - beginoffset + 8) & ~7
617 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
620class UnsupportedIndexFormat(Exception):
621 """An unsupported index format was encountered."""
623 def __init__(self, version: int) -> None:
624 self.index_format_version = version
627def read_index_header(f: BinaryIO) -> tuple[int, int]:
628 """Read an index header from a file.
630 Returns:
631 tuple of (version, num_entries)
632 """
633 header = f.read(4)
634 if header != b"DIRC":
635 raise AssertionError(f"Invalid index file header: {header!r}")
636 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
637 if version not in (1, 2, 3, 4):
638 raise UnsupportedIndexFormat(version)
639 return version, num_entries
642def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None:
643 """Write an index extension.
645 Args:
646 f: File-like object to write to
647 extension: Extension to write
648 """
649 data = extension.to_bytes()
650 f.write(extension.signature)
651 f.write(struct.pack(">I", len(data)))
652 f.write(data)
655def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
656 """Read an index file, yielding the individual entries."""
657 version, num_entries = read_index_header(f)
658 previous_path = b""
659 for i in range(num_entries):
660 entry = read_cache_entry(f, version, previous_path)
661 previous_path = entry.name
662 yield entry
665def read_index_dict_with_version(
666 f: BinaryIO,
667) -> tuple[
668 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
669]:
670 """Read an index file and return it as a dictionary along with the version.
672 Returns:
673 tuple of (entries_dict, version, extensions)
674 """
675 version, num_entries = read_index_header(f)
677 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
678 previous_path = b""
679 for i in range(num_entries):
680 entry = read_cache_entry(f, version, previous_path)
681 previous_path = entry.name
682 stage = entry.stage()
683 if stage == Stage.NORMAL:
684 ret[entry.name] = IndexEntry.from_serialized(entry)
685 else:
686 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
687 if isinstance(existing, IndexEntry):
688 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
689 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
690 existing.ancestor = IndexEntry.from_serialized(entry)
691 elif stage == Stage.MERGE_CONFLICT_THIS:
692 existing.this = IndexEntry.from_serialized(entry)
693 elif stage == Stage.MERGE_CONFLICT_OTHER:
694 existing.other = IndexEntry.from_serialized(entry)
696 # Read extensions
697 extensions = []
698 while True:
699 # Check if we're at the end (20 bytes before EOF for SHA checksum)
700 current_pos = f.tell()
701 f.seek(0, 2) # EOF
702 eof_pos = f.tell()
703 f.seek(current_pos)
705 if current_pos >= eof_pos - 20:
706 break
708 # Try to read extension signature
709 signature = f.read(4)
710 if len(signature) < 4:
711 break
713 # Check if it's a valid extension signature (4 uppercase letters)
714 if not all(65 <= b <= 90 for b in signature):
715 # Not an extension, seek back
716 f.seek(-4, 1)
717 break
719 # Read extension size
720 size_data = f.read(4)
721 if len(size_data) < 4:
722 break
723 size = struct.unpack(">I", size_data)[0]
725 # Read extension data
726 data = f.read(size)
727 if len(data) < size:
728 break
730 extension = IndexExtension.from_raw(signature, data)
731 extensions.append(extension)
733 return ret, version, extensions
736def read_index_dict(
737 f: BinaryIO,
738) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
739 """Read an index file and return it as a dictionary.
740 Dict Key is tuple of path and stage number, as
741 path alone is not unique
742 Args:
743 f: File object to read fromls.
744 """
745 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
746 for entry in read_index(f):
747 stage = entry.stage()
748 if stage == Stage.NORMAL:
749 ret[entry.name] = IndexEntry.from_serialized(entry)
750 else:
751 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
752 if isinstance(existing, IndexEntry):
753 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
754 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
755 existing.ancestor = IndexEntry.from_serialized(entry)
756 elif stage == Stage.MERGE_CONFLICT_THIS:
757 existing.this = IndexEntry.from_serialized(entry)
758 elif stage == Stage.MERGE_CONFLICT_OTHER:
759 existing.other = IndexEntry.from_serialized(entry)
760 return ret
763def write_index(
764 f: BinaryIO,
765 entries: list[SerializedIndexEntry],
766 version: Optional[int] = None,
767 extensions: Optional[list[IndexExtension]] = None,
768) -> None:
769 """Write an index file.
771 Args:
772 f: File-like object to write to
773 version: Version number to write
774 entries: Iterable over the entries to write
775 extensions: Optional list of extensions to write
776 """
777 if version is None:
778 version = DEFAULT_VERSION
779 # STEP 1: check if any extended_flags are set
780 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
781 if uses_extended_flags and version < 3:
782 # Force or bump the version to 3
783 version = 3
784 # The rest is unchanged, but you might insert a final check:
785 if version < 3:
786 # Double-check no extended flags appear
787 for e in entries:
788 if e.extended_flags != 0:
789 raise AssertionError("Attempt to use extended flags in index < v3")
790 # Proceed with the existing code to write the header and entries.
791 f.write(b"DIRC")
792 f.write(struct.pack(b">LL", version, len(entries)))
793 previous_path = b""
794 for entry in entries:
795 write_cache_entry(f, entry, version=version, previous_path=previous_path)
796 previous_path = entry.name
798 # Write extensions
799 if extensions:
800 for extension in extensions:
801 write_index_extension(f, extension)
804def write_index_dict(
805 f: BinaryIO,
806 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
807 version: Optional[int] = None,
808 extensions: Optional[list[IndexExtension]] = None,
809) -> None:
810 """Write an index file based on the contents of a dictionary.
811 being careful to sort by path and then by stage.
812 """
813 entries_list = []
814 for key in sorted(entries):
815 value = entries[key]
816 if isinstance(value, ConflictedIndexEntry):
817 if value.ancestor is not None:
818 entries_list.append(
819 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
820 )
821 if value.this is not None:
822 entries_list.append(
823 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
824 )
825 if value.other is not None:
826 entries_list.append(
827 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
828 )
829 else:
830 entries_list.append(value.serialize(key, Stage.NORMAL))
832 write_index(f, entries_list, version=version, extensions=extensions)
835def cleanup_mode(mode: int) -> int:
836 """Cleanup a mode value.
838 This will return a mode that can be stored in a tree object.
840 Args:
841 mode: Mode to clean up.
843 Returns:
844 mode
845 """
846 if stat.S_ISLNK(mode):
847 return stat.S_IFLNK
848 elif stat.S_ISDIR(mode):
849 return stat.S_IFDIR
850 elif S_ISGITLINK(mode):
851 return S_IFGITLINK
852 ret = stat.S_IFREG | 0o644
853 if mode & 0o100:
854 ret |= 0o111
855 return ret
858class Index:
859 """A Git Index file."""
861 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
863 def __init__(
864 self,
865 filename: Union[bytes, str, os.PathLike],
866 read: bool = True,
867 skip_hash: bool = False,
868 version: Optional[int] = None,
869 ) -> None:
870 """Create an index object associated with the given filename.
872 Args:
873 filename: Path to the index file
874 read: Whether to initialize the index from the given file, should it exist.
875 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
876 version: Index format version to use (None = auto-detect from file or use default)
877 """
878 self._filename = os.fspath(filename)
879 # TODO(jelmer): Store the version returned by read_index
880 self._version = version
881 self._skip_hash = skip_hash
882 self._extensions: list[IndexExtension] = []
883 self.clear()
884 if read:
885 self.read()
887 @property
888 def path(self) -> Union[bytes, str]:
889 return self._filename
891 def __repr__(self) -> str:
892 return f"{self.__class__.__name__}({self._filename!r})"
894 def write(self) -> None:
895 """Write current contents of index to disk."""
896 from typing import BinaryIO, cast
898 f = GitFile(self._filename, "wb")
899 try:
900 # Filter out extensions with no meaningful data
901 meaningful_extensions = []
902 for ext in self._extensions:
903 # Skip extensions that have empty data
904 ext_data = ext.to_bytes()
905 if ext_data:
906 meaningful_extensions.append(ext)
908 if self._skip_hash:
909 # When skipHash is enabled, write the index without computing SHA1
910 write_index_dict(
911 cast(BinaryIO, f),
912 self._byname,
913 version=self._version,
914 extensions=meaningful_extensions,
915 )
916 # Write 20 zero bytes instead of SHA1
917 f.write(b"\x00" * 20)
918 f.close()
919 else:
920 sha1_writer = SHA1Writer(cast(BinaryIO, f))
921 write_index_dict(
922 cast(BinaryIO, sha1_writer),
923 self._byname,
924 version=self._version,
925 extensions=meaningful_extensions,
926 )
927 sha1_writer.close()
928 except:
929 f.close()
930 raise
932 def read(self) -> None:
933 """Read current contents of index from disk."""
934 if not os.path.exists(self._filename):
935 return
936 f = GitFile(self._filename, "rb")
937 try:
938 sha1_reader = SHA1Reader(f)
939 entries, version, extensions = read_index_dict_with_version(
940 cast(BinaryIO, sha1_reader)
941 )
942 self._version = version
943 self._extensions = extensions
944 self.update(entries)
945 # Extensions have already been read by read_index_dict_with_version
946 sha1_reader.check_sha(allow_empty=True)
947 finally:
948 f.close()
950 def __len__(self) -> int:
951 """Number of entries in this index file."""
952 return len(self._byname)
954 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
955 """Retrieve entry by relative path and stage.
957 Returns: Either a IndexEntry or a ConflictedIndexEntry
958 Raises KeyError: if the entry does not exist
959 """
960 return self._byname[key]
962 def __iter__(self) -> Iterator[bytes]:
963 """Iterate over the paths and stages in this index."""
964 return iter(self._byname)
966 def __contains__(self, key: bytes) -> bool:
967 return key in self._byname
969 def get_sha1(self, path: bytes) -> bytes:
970 """Return the (git object) SHA1 for the object at a path."""
971 value = self[path]
972 if isinstance(value, ConflictedIndexEntry):
973 raise UnmergedEntries
974 return value.sha
976 def get_mode(self, path: bytes) -> int:
977 """Return the POSIX file mode for the object at a path."""
978 value = self[path]
979 if isinstance(value, ConflictedIndexEntry):
980 raise UnmergedEntries
981 return value.mode
983 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
984 """Iterate over path, sha, mode tuples for use with commit_tree."""
985 for path in self:
986 entry = self[path]
987 if isinstance(entry, ConflictedIndexEntry):
988 raise UnmergedEntries
989 yield path, entry.sha, cleanup_mode(entry.mode)
991 def has_conflicts(self) -> bool:
992 for value in self._byname.values():
993 if isinstance(value, ConflictedIndexEntry):
994 return True
995 return False
997 def clear(self) -> None:
998 """Remove all contents from this index."""
999 self._byname = {}
1001 def __setitem__(
1002 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
1003 ) -> None:
1004 assert isinstance(name, bytes)
1005 self._byname[name] = value
1007 def __delitem__(self, name: bytes) -> None:
1008 del self._byname[name]
1010 def iteritems(
1011 self,
1012 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1013 return iter(self._byname.items())
1015 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1016 return iter(self._byname.items())
1018 def update(
1019 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
1020 ) -> None:
1021 for key, value in entries.items():
1022 self[key] = value
1024 def paths(self) -> Generator[bytes, None, None]:
1025 yield from self._byname.keys()
1027 def changes_from_tree(
1028 self,
1029 object_store: ObjectContainer,
1030 tree: ObjectID,
1031 want_unchanged: bool = False,
1032 ) -> Generator[
1033 tuple[
1034 tuple[Optional[bytes], Optional[bytes]],
1035 tuple[Optional[int], Optional[int]],
1036 tuple[Optional[bytes], Optional[bytes]],
1037 ],
1038 None,
1039 None,
1040 ]:
1041 """Find the differences between the contents of this index and a tree.
1043 Args:
1044 object_store: Object store to use for retrieving tree contents
1045 tree: SHA1 of the root tree
1046 want_unchanged: Whether unchanged files should be reported
1047 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1048 newmode), (oldsha, newsha)
1049 """
1051 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1052 entry = self[path]
1053 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1054 return entry.sha, cleanup_mode(entry.mode)
1055 else:
1056 # Handle ConflictedIndexEntry case
1057 return b"", 0
1059 yield from changes_from_tree(
1060 self.paths(),
1061 lookup_entry,
1062 object_store,
1063 tree,
1064 want_unchanged=want_unchanged,
1065 )
1067 def commit(self, object_store: ObjectContainer) -> bytes:
1068 """Create a new tree from an index.
1070 Args:
1071 object_store: Object store to save the tree in
1072 Returns:
1073 Root tree SHA
1074 """
1075 return commit_tree(object_store, self.iterobjects())
1078def commit_tree(
1079 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1080) -> bytes:
1081 """Commit a new tree.
1083 Args:
1084 object_store: Object store to add trees to
1085 blobs: Iterable over blob path, sha, mode entries
1086 Returns:
1087 SHA1 of the created tree.
1088 """
1089 trees: dict[bytes, Any] = {b"": {}}
1091 def add_tree(path: bytes) -> dict[bytes, Any]:
1092 if path in trees:
1093 return trees[path]
1094 dirname, basename = pathsplit(path)
1095 t = add_tree(dirname)
1096 assert isinstance(basename, bytes)
1097 newtree: dict[bytes, Any] = {}
1098 t[basename] = newtree
1099 trees[path] = newtree
1100 return newtree
1102 for path, sha, mode in blobs:
1103 tree_path, basename = pathsplit(path)
1104 tree = add_tree(tree_path)
1105 tree[basename] = (mode, sha)
1107 def build_tree(path: bytes) -> bytes:
1108 tree = Tree()
1109 for basename, entry in trees[path].items():
1110 if isinstance(entry, dict):
1111 mode = stat.S_IFDIR
1112 sha = build_tree(pathjoin(path, basename))
1113 else:
1114 (mode, sha) = entry
1115 tree.add(basename, mode, sha)
1116 object_store.add_object(tree)
1117 return tree.id
1119 return build_tree(b"")
1122def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1123 """Create a new tree from an index.
1125 Args:
1126 object_store: Object store to save the tree in
1127 index: Index file
1128 Note: This function is deprecated, use index.commit() instead.
1129 Returns: Root tree sha.
1130 """
1131 return commit_tree(object_store, index.iterobjects())
1134def changes_from_tree(
1135 names: Iterable[bytes],
1136 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1137 object_store: ObjectContainer,
1138 tree: Optional[bytes],
1139 want_unchanged: bool = False,
1140) -> Iterable[
1141 tuple[
1142 tuple[Optional[bytes], Optional[bytes]],
1143 tuple[Optional[int], Optional[int]],
1144 tuple[Optional[bytes], Optional[bytes]],
1145 ]
1146]:
1147 """Find the differences between the contents of a tree and
1148 a working copy.
1150 Args:
1151 names: Iterable of names in the working copy
1152 lookup_entry: Function to lookup an entry in the working copy
1153 object_store: Object store to use for retrieving tree contents
1154 tree: SHA1 of the root tree, or None for an empty tree
1155 want_unchanged: Whether unchanged files should be reported
1156 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1157 (oldsha, newsha)
1158 """
1159 # TODO(jelmer): Support a include_trees option
1160 other_names = set(names)
1162 if tree is not None:
1163 for name, mode, sha in iter_tree_contents(object_store, tree):
1164 try:
1165 (other_sha, other_mode) = lookup_entry(name)
1166 except KeyError:
1167 # Was removed
1168 yield ((name, None), (mode, None), (sha, None))
1169 else:
1170 other_names.remove(name)
1171 if want_unchanged or other_sha != sha or other_mode != mode:
1172 yield ((name, name), (mode, other_mode), (sha, other_sha))
1174 # Mention added files
1175 for name in other_names:
1176 try:
1177 (other_sha, other_mode) = lookup_entry(name)
1178 except KeyError:
1179 pass
1180 else:
1181 yield ((None, name), (None, other_mode), (None, other_sha))
1184def index_entry_from_stat(
1185 stat_val: os.stat_result,
1186 hex_sha: bytes,
1187 mode: Optional[int] = None,
1188) -> IndexEntry:
1189 """Create a new index entry from a stat value.
1191 Args:
1192 stat_val: POSIX stat_result instance
1193 hex_sha: Hex sha of the object
1194 """
1195 if mode is None:
1196 mode = cleanup_mode(stat_val.st_mode)
1198 return IndexEntry(
1199 ctime=stat_val.st_ctime,
1200 mtime=stat_val.st_mtime,
1201 dev=stat_val.st_dev,
1202 ino=stat_val.st_ino,
1203 mode=mode,
1204 uid=stat_val.st_uid,
1205 gid=stat_val.st_gid,
1206 size=stat_val.st_size,
1207 sha=hex_sha,
1208 flags=0,
1209 extended_flags=0,
1210 )
1213if sys.platform == "win32":
1214 # On Windows, creating symlinks either requires administrator privileges
1215 # or developer mode. Raise a more helpful error when we're unable to
1216 # create symlinks
1218 # https://github.com/jelmer/dulwich/issues/1005
1220 class WindowsSymlinkPermissionError(PermissionError):
1221 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
1222 super(PermissionError, self).__init__(
1223 errno,
1224 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1225 filename,
1226 )
1228 def symlink(
1229 src: Union[str, bytes],
1230 dst: Union[str, bytes],
1231 target_is_directory: bool = False,
1232 *,
1233 dir_fd: Optional[int] = None,
1234 ) -> None:
1235 try:
1236 return os.symlink(
1237 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1238 )
1239 except PermissionError as e:
1240 raise WindowsSymlinkPermissionError(
1241 e.errno or 0, e.strerror or "", e.filename
1242 ) from e
1243else:
1244 symlink = os.symlink
1247def build_file_from_blob(
1248 blob: Blob,
1249 mode: int,
1250 target_path: bytes,
1251 *,
1252 honor_filemode: bool = True,
1253 tree_encoding: str = "utf-8",
1254 symlink_fn: Optional[Callable] = None,
1255) -> os.stat_result:
1256 """Build a file or symlink on disk based on a Git object.
1258 Args:
1259 blob: The git object
1260 mode: File mode
1261 target_path: Path to write to
1262 honor_filemode: An optional flag to honor core.filemode setting in
1263 config file, default is core.filemode=True, change executable bit
1264 symlink_fn: Function to use for creating symlinks
1265 Returns: stat object for the file
1266 """
1267 try:
1268 oldstat = os.lstat(target_path)
1269 except FileNotFoundError:
1270 oldstat = None
1271 contents = blob.as_raw_string()
1272 if stat.S_ISLNK(mode):
1273 if oldstat:
1274 _remove_file_with_readonly_handling(target_path)
1275 if sys.platform == "win32":
1276 # os.readlink on Python3 on Windows requires a unicode string.
1277 contents_str = contents.decode(tree_encoding)
1278 target_path_str = target_path.decode(tree_encoding)
1279 (symlink_fn or symlink)(contents_str, target_path_str)
1280 else:
1281 (symlink_fn or symlink)(contents, target_path)
1282 else:
1283 if oldstat is not None and oldstat.st_size == len(contents):
1284 with open(target_path, "rb") as f:
1285 if f.read() == contents:
1286 return oldstat
1288 with open(target_path, "wb") as f:
1289 # Write out file
1290 f.write(contents)
1292 if honor_filemode:
1293 os.chmod(target_path, mode)
1295 return os.lstat(target_path)
1298INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1301def validate_path_element_default(element: bytes) -> bool:
1302 return element.lower() not in INVALID_DOTNAMES
1305def validate_path_element_ntfs(element: bytes) -> bool:
1306 stripped = element.rstrip(b". ").lower()
1307 if stripped in INVALID_DOTNAMES:
1308 return False
1309 if stripped == b"git~1":
1310 return False
1311 return True
1314# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1315HFS_IGNORABLE_CHARS = {
1316 0x200C, # ZERO WIDTH NON-JOINER
1317 0x200D, # ZERO WIDTH JOINER
1318 0x200E, # LEFT-TO-RIGHT MARK
1319 0x200F, # RIGHT-TO-LEFT MARK
1320 0x202A, # LEFT-TO-RIGHT EMBEDDING
1321 0x202B, # RIGHT-TO-LEFT EMBEDDING
1322 0x202C, # POP DIRECTIONAL FORMATTING
1323 0x202D, # LEFT-TO-RIGHT OVERRIDE
1324 0x202E, # RIGHT-TO-LEFT OVERRIDE
1325 0x206A, # INHIBIT SYMMETRIC SWAPPING
1326 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1327 0x206C, # INHIBIT ARABIC FORM SHAPING
1328 0x206D, # ACTIVATE ARABIC FORM SHAPING
1329 0x206E, # NATIONAL DIGIT SHAPES
1330 0x206F, # NOMINAL DIGIT SHAPES
1331 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1332}
1335def validate_path_element_hfs(element: bytes) -> bool:
1336 """Validate path element for HFS+ filesystem.
1338 Equivalent to Git's is_hfs_dotgit and related checks.
1339 Uses NFD normalization and ignores HFS+ ignorable characters.
1340 """
1341 import unicodedata
1343 try:
1344 # Decode to Unicode
1345 element_str = element.decode("utf-8", errors="strict")
1346 except UnicodeDecodeError:
1347 # Malformed UTF-8 - be conservative and reject
1348 return False
1350 # Remove HFS+ ignorable characters (like Git's next_hfs_char)
1351 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1353 # Normalize to NFD (HFS+ uses a variant of NFD)
1354 normalized = unicodedata.normalize("NFD", filtered)
1356 # Check against invalid names (case-insensitive)
1357 normalized_bytes = normalized.encode("utf-8", errors="strict")
1358 if normalized_bytes.lower() in INVALID_DOTNAMES:
1359 return False
1361 # Also check for 8.3 short name
1362 if normalized_bytes.lower() == b"git~1":
1363 return False
1365 return True
1368def validate_path(
1369 path: bytes,
1370 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1371) -> bool:
1372 """Default path validator that just checks for .git/."""
1373 parts = path.split(b"/")
1374 for p in parts:
1375 if not element_validator(p):
1376 return False
1377 else:
1378 return True
1381def build_index_from_tree(
1382 root_path: Union[str, bytes],
1383 index_path: Union[str, bytes],
1384 object_store: ObjectContainer,
1385 tree_id: bytes,
1386 honor_filemode: bool = True,
1387 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1388 symlink_fn: Optional[Callable] = None,
1389 blob_normalizer: Optional["BlobNormalizer"] = None,
1390) -> None:
1391 """Generate and materialize index from a tree.
1393 Args:
1394 tree_id: Tree to materialize
1395 root_path: Target dir for materialized index files
1396 index_path: Target path for generated index
1397 object_store: Non-empty object store holding tree contents
1398 honor_filemode: An optional flag to honor core.filemode setting in
1399 config file, default is core.filemode=True, change executable bit
1400 validate_path_element: Function to validate path elements to check
1401 out; default just refuses .git and .. directories.
1402 blob_normalizer: An optional BlobNormalizer to use for converting line
1403 endings when writing blobs to the working directory.
1405 Note: existing index is wiped and contents are not merged
1406 in a working dir. Suitable only for fresh clones.
1407 """
1408 index = Index(index_path, read=False)
1409 if not isinstance(root_path, bytes):
1410 root_path = os.fsencode(root_path)
1412 for entry in iter_tree_contents(object_store, tree_id):
1413 if not validate_path(entry.path, validate_path_element):
1414 continue
1415 full_path = _tree_to_fs_path(root_path, entry.path)
1417 if not os.path.exists(os.path.dirname(full_path)):
1418 os.makedirs(os.path.dirname(full_path))
1420 # TODO(jelmer): Merge new index into working tree
1421 if S_ISGITLINK(entry.mode):
1422 if not os.path.isdir(full_path):
1423 os.mkdir(full_path)
1424 st = os.lstat(full_path)
1425 # TODO(jelmer): record and return submodule paths
1426 else:
1427 obj = object_store[entry.sha]
1428 assert isinstance(obj, Blob)
1429 # Apply blob normalization for checkout if normalizer is provided
1430 if blob_normalizer is not None:
1431 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1432 st = build_file_from_blob(
1433 obj,
1434 entry.mode,
1435 full_path,
1436 honor_filemode=honor_filemode,
1437 symlink_fn=symlink_fn,
1438 )
1440 # Add file to index
1441 if not honor_filemode or S_ISGITLINK(entry.mode):
1442 # we can not use tuple slicing to build a new tuple,
1443 # because on windows that will convert the times to
1444 # longs, which causes errors further along
1445 st_tuple = (
1446 entry.mode,
1447 st.st_ino,
1448 st.st_dev,
1449 st.st_nlink,
1450 st.st_uid,
1451 st.st_gid,
1452 st.st_size,
1453 st.st_atime,
1454 st.st_mtime,
1455 st.st_ctime,
1456 )
1457 st = st.__class__(st_tuple)
1458 # default to a stage 0 index entry (normal)
1459 # when reading from the filesystem
1460 index[entry.path] = index_entry_from_stat(st, entry.sha)
1462 index.write()
1465def blob_from_path_and_mode(
1466 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1467) -> Blob:
1468 """Create a blob from a path and a stat object.
1470 Args:
1471 fs_path: Full file system path to file
1472 mode: File mode
1473 Returns: A `Blob` object
1474 """
1475 assert isinstance(fs_path, bytes)
1476 blob = Blob()
1477 if stat.S_ISLNK(mode):
1478 if sys.platform == "win32":
1479 # os.readlink on Python3 on Windows requires a unicode string.
1480 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1481 else:
1482 blob.data = os.readlink(fs_path)
1483 else:
1484 with open(fs_path, "rb") as f:
1485 blob.data = f.read()
1486 return blob
1489def blob_from_path_and_stat(
1490 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
1491) -> Blob:
1492 """Create a blob from a path and a stat object.
1494 Args:
1495 fs_path: Full file system path to file
1496 st: A stat object
1497 Returns: A `Blob` object
1498 """
1499 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
1502def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
1503 """Read the head commit of a submodule.
1505 Args:
1506 path: path to the submodule
1507 Returns: HEAD sha, None if not a valid head/repository
1508 """
1509 from .errors import NotGitRepository
1510 from .repo import Repo
1512 # Repo currently expects a "str", so decode if necessary.
1513 # TODO(jelmer): Perhaps move this into Repo() ?
1514 if not isinstance(path, str):
1515 path = os.fsdecode(path)
1516 try:
1517 repo = Repo(path)
1518 except NotGitRepository:
1519 return None
1520 try:
1521 return repo.head()
1522 except KeyError:
1523 return None
1526def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
1527 """Check if a directory has changed after getting an error.
1529 When handling an error trying to create a blob from a path, call this
1530 function. It will check if the path is a directory. If it's a directory
1531 and a submodule, check the submodule head to see if it's has changed. If
1532 not, consider the file as changed as Git tracked a file and not a
1533 directory.
1535 Return true if the given path should be considered as changed and False
1536 otherwise or if the path is not a directory.
1537 """
1538 # This is actually a directory
1539 if os.path.exists(os.path.join(tree_path, b".git")):
1540 # Submodule
1541 head = read_submodule_head(tree_path)
1542 if entry.sha != head:
1543 return True
1544 else:
1545 # The file was changed to a directory, so consider it removed.
1546 return True
1548 return False
1551os_sep_bytes = os.sep.encode("ascii")
1554def _ensure_parent_dir_exists(full_path: bytes) -> None:
1555 """Ensure parent directory exists, checking no parent is a file."""
1556 parent_dir = os.path.dirname(full_path)
1557 if parent_dir and not os.path.exists(parent_dir):
1558 # Check if any parent in the path is a file
1559 parts = parent_dir.split(os_sep_bytes)
1560 for i in range(len(parts)):
1561 partial_path = os_sep_bytes.join(parts[: i + 1])
1562 if (
1563 partial_path
1564 and os.path.exists(partial_path)
1565 and not os.path.isdir(partial_path)
1566 ):
1567 # Parent path is a file, this is an error
1568 raise OSError(
1569 f"Cannot create directory, parent path is a file: {partial_path!r}"
1570 )
1571 os.makedirs(parent_dir)
1574def _remove_file_with_readonly_handling(path: bytes) -> None:
1575 """Remove a file, handling read-only files on Windows.
1577 Args:
1578 path: Path to the file to remove
1579 """
1580 try:
1581 os.unlink(path)
1582 except PermissionError:
1583 # On Windows, remove read-only attribute and retry
1584 if sys.platform == "win32":
1585 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
1586 os.unlink(path)
1587 else:
1588 raise
1591def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
1592 """Remove empty parent directories up to stop_at."""
1593 parent = os.path.dirname(path)
1594 while parent and parent != stop_at:
1595 try:
1596 os.rmdir(parent)
1597 parent = os.path.dirname(parent)
1598 except FileNotFoundError:
1599 # Directory doesn't exist - stop trying
1600 break
1601 except OSError as e:
1602 if e.errno == errno.ENOTEMPTY:
1603 # Directory not empty - stop trying
1604 break
1605 raise
1608def _check_symlink_matches(
1609 full_path: bytes, repo_object_store, entry_sha: bytes
1610) -> bool:
1611 """Check if symlink target matches expected target.
1613 Returns True if symlink needs to be written, False if it matches.
1614 """
1615 try:
1616 current_target = os.readlink(full_path)
1617 blob_obj = repo_object_store[entry_sha]
1618 expected_target = blob_obj.as_raw_string()
1619 if isinstance(current_target, str):
1620 current_target = current_target.encode()
1621 return current_target != expected_target
1622 except FileNotFoundError:
1623 # Symlink doesn't exist
1624 return True
1625 except OSError as e:
1626 if e.errno == errno.EINVAL:
1627 # Not a symlink
1628 return True
1629 raise
1632def _check_file_matches(
1633 repo_object_store,
1634 full_path: bytes,
1635 entry_sha: bytes,
1636 entry_mode: int,
1637 current_stat: os.stat_result,
1638 honor_filemode: bool,
1639 blob_normalizer: Optional["BlobNormalizer"] = None,
1640 tree_path: Optional[bytes] = None,
1641) -> bool:
1642 """Check if a file on disk matches the expected git object.
1644 Returns True if file needs to be written, False if it matches.
1645 """
1646 # Check mode first (if honor_filemode is True)
1647 if honor_filemode:
1648 current_mode = stat.S_IMODE(current_stat.st_mode)
1649 expected_mode = stat.S_IMODE(entry_mode)
1650 if current_mode != expected_mode:
1651 return True
1653 # If mode matches (or we don't care), check content via size first
1654 blob_obj = repo_object_store[entry_sha]
1655 if current_stat.st_size != blob_obj.raw_length():
1656 return True
1658 # Size matches, check actual content
1659 try:
1660 with open(full_path, "rb") as f:
1661 current_content = f.read()
1662 expected_content = blob_obj.as_raw_string()
1663 if blob_normalizer and tree_path is not None:
1664 normalized_blob = blob_normalizer.checkout_normalize(
1665 blob_obj, tree_path
1666 )
1667 expected_content = normalized_blob.as_raw_string()
1668 return current_content != expected_content
1669 except (FileNotFoundError, PermissionError, IsADirectoryError):
1670 return True
1673def _transition_to_submodule(repo, path, full_path, current_stat, entry, index):
1674 """Transition any type to submodule."""
1675 from .submodule import ensure_submodule_placeholder
1677 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1678 # Already a directory, just ensure .git file exists
1679 ensure_submodule_placeholder(repo, path)
1680 else:
1681 # Remove whatever is there and create submodule
1682 if current_stat is not None:
1683 _remove_file_with_readonly_handling(full_path)
1684 ensure_submodule_placeholder(repo, path)
1686 st = os.lstat(full_path)
1687 index[path] = index_entry_from_stat(st, entry.sha)
1690def _transition_to_file(
1691 object_store,
1692 path,
1693 full_path,
1694 current_stat,
1695 entry,
1696 index,
1697 honor_filemode,
1698 symlink_fn,
1699 blob_normalizer,
1700):
1701 """Transition any type to regular file or symlink."""
1702 # Check if we need to update
1703 if (
1704 current_stat is not None
1705 and stat.S_ISREG(current_stat.st_mode)
1706 and not stat.S_ISLNK(entry.mode)
1707 ):
1708 # File to file - check if update needed
1709 needs_update = _check_file_matches(
1710 object_store,
1711 full_path,
1712 entry.sha,
1713 entry.mode,
1714 current_stat,
1715 honor_filemode,
1716 blob_normalizer,
1717 path,
1718 )
1719 elif (
1720 current_stat is not None
1721 and stat.S_ISLNK(current_stat.st_mode)
1722 and stat.S_ISLNK(entry.mode)
1723 ):
1724 # Symlink to symlink - check if update needed
1725 needs_update = _check_symlink_matches(full_path, object_store, entry.sha)
1726 else:
1727 needs_update = True
1729 if not needs_update:
1730 # Just update index - current_stat should always be valid here since we're not updating
1731 index[path] = index_entry_from_stat(current_stat, entry.sha)
1732 return
1734 # Remove existing entry if needed
1735 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1736 # Remove directory
1737 dir_contents = set(os.listdir(full_path))
1738 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
1740 if git_file_name in dir_contents:
1741 if dir_contents != {git_file_name}:
1742 raise IsADirectoryError(
1743 f"Cannot replace submodule with untracked files: {full_path!r}"
1744 )
1745 shutil.rmtree(full_path)
1746 else:
1747 try:
1748 os.rmdir(full_path)
1749 except OSError as e:
1750 if e.errno == errno.ENOTEMPTY:
1751 raise IsADirectoryError(
1752 f"Cannot replace non-empty directory with file: {full_path!r}"
1753 )
1754 raise
1755 elif current_stat is not None:
1756 _remove_file_with_readonly_handling(full_path)
1758 # Ensure parent directory exists
1759 _ensure_parent_dir_exists(full_path)
1761 # Write the file
1762 blob_obj = object_store[entry.sha]
1763 assert isinstance(blob_obj, Blob)
1764 if blob_normalizer:
1765 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
1766 st = build_file_from_blob(
1767 blob_obj,
1768 entry.mode,
1769 full_path,
1770 honor_filemode=honor_filemode,
1771 symlink_fn=symlink_fn,
1772 )
1773 index[path] = index_entry_from_stat(st, entry.sha)
1776def _transition_to_absent(repo, path, full_path, current_stat, index):
1777 """Remove any type of entry."""
1778 if current_stat is None:
1779 return
1781 if stat.S_ISDIR(current_stat.st_mode):
1782 # Check if it's a submodule directory
1783 dir_contents = set(os.listdir(full_path))
1784 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
1786 if git_file_name in dir_contents and dir_contents == {git_file_name}:
1787 shutil.rmtree(full_path)
1788 else:
1789 try:
1790 os.rmdir(full_path)
1791 except OSError as e:
1792 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
1793 raise
1794 else:
1795 _remove_file_with_readonly_handling(full_path)
1797 try:
1798 del index[path]
1799 except KeyError:
1800 pass
1802 # Try to remove empty parent directories
1803 _remove_empty_parents(
1804 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
1805 )
1808def update_working_tree(
1809 repo: "Repo",
1810 old_tree_id: Optional[bytes],
1811 new_tree_id: bytes,
1812 honor_filemode: bool = True,
1813 validate_path_element: Optional[Callable[[bytes], bool]] = None,
1814 symlink_fn: Optional[Callable] = None,
1815 force_remove_untracked: bool = False,
1816 blob_normalizer: Optional["BlobNormalizer"] = None,
1817) -> None:
1818 """Update the working tree and index to match a new tree.
1820 This function handles:
1821 - Adding new files
1822 - Updating modified files
1823 - Removing deleted files
1824 - Cleaning up empty directories
1826 Args:
1827 repo: Repository object
1828 old_tree_id: SHA of the tree before the update
1829 new_tree_id: SHA of the tree to update to
1830 honor_filemode: An optional flag to honor core.filemode setting
1831 validate_path_element: Function to validate path elements to check out
1832 symlink_fn: Function to use for creating symlinks
1833 force_remove_untracked: If True, remove files that exist in working
1834 directory but not in target tree, even if old_tree_id is None
1835 blob_normalizer: An optional BlobNormalizer to use for converting line
1836 endings when writing blobs to the working directory.
1837 """
1838 if validate_path_element is None:
1839 validate_path_element = validate_path_element_default
1841 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
1842 index = repo.open_index()
1844 # Build sets of paths for efficient lookup
1845 new_paths = {}
1846 for entry in iter_tree_contents(repo.object_store, new_tree_id):
1847 if entry.path.startswith(b".git") or not validate_path(
1848 entry.path, validate_path_element
1849 ):
1850 continue
1851 new_paths[entry.path] = entry
1853 old_paths = {}
1854 if old_tree_id:
1855 for entry in iter_tree_contents(repo.object_store, old_tree_id):
1856 if not entry.path.startswith(b".git"):
1857 old_paths[entry.path] = entry
1859 # Process all paths
1860 all_paths = set(new_paths.keys()) | set(old_paths.keys())
1862 # Check for paths that need to become directories
1863 paths_needing_dir = set()
1864 for path in new_paths:
1865 parts = path.split(b"/")
1866 for i in range(1, len(parts)):
1867 parent = b"/".join(parts[:i])
1868 if parent in old_paths and parent not in new_paths:
1869 paths_needing_dir.add(parent)
1871 # Check if any path that needs to become a directory has been modified
1872 current_stat: Optional[os.stat_result]
1873 stat_cache: dict[bytes, Optional[os.stat_result]] = {}
1874 for path in paths_needing_dir:
1875 full_path = _tree_to_fs_path(repo_path, path)
1876 try:
1877 current_stat = os.lstat(full_path)
1878 except FileNotFoundError:
1879 # File doesn't exist, proceed
1880 stat_cache[full_path] = None
1881 except PermissionError:
1882 # Can't read file, proceed
1883 pass
1884 else:
1885 stat_cache[full_path] = current_stat
1886 if stat.S_ISREG(current_stat.st_mode):
1887 # Check if file has been modified
1888 old_entry = old_paths[path]
1889 if _check_file_matches(
1890 repo.object_store,
1891 full_path,
1892 old_entry.sha,
1893 old_entry.mode,
1894 current_stat,
1895 honor_filemode,
1896 blob_normalizer,
1897 path,
1898 ):
1899 # File has been modified, can't replace with directory
1900 raise OSError(
1901 f"Cannot replace modified file with directory: {path!r}"
1902 )
1904 # Process in two passes: deletions first, then additions/updates
1905 # This handles case-only renames on case-insensitive filesystems correctly
1906 paths_to_remove = []
1907 paths_to_update = []
1909 for path in sorted(all_paths):
1910 if path in new_paths:
1911 paths_to_update.append(path)
1912 else:
1913 paths_to_remove.append(path)
1915 # First process removals
1916 for path in paths_to_remove:
1917 full_path = _tree_to_fs_path(repo_path, path)
1919 # Determine current state - use cache if available
1920 try:
1921 current_stat = stat_cache[full_path]
1922 except KeyError:
1923 try:
1924 current_stat = os.lstat(full_path)
1925 except FileNotFoundError:
1926 current_stat = None
1928 _transition_to_absent(repo, path, full_path, current_stat, index)
1930 # Then process additions/updates
1931 for path in paths_to_update:
1932 full_path = _tree_to_fs_path(repo_path, path)
1934 # Determine current state - use cache if available
1935 try:
1936 current_stat = stat_cache[full_path]
1937 except KeyError:
1938 try:
1939 current_stat = os.lstat(full_path)
1940 except FileNotFoundError:
1941 current_stat = None
1943 new_entry = new_paths[path]
1945 # Path should exist
1946 if S_ISGITLINK(new_entry.mode):
1947 _transition_to_submodule(
1948 repo, path, full_path, current_stat, new_entry, index
1949 )
1950 else:
1951 _transition_to_file(
1952 repo.object_store,
1953 path,
1954 full_path,
1955 current_stat,
1956 new_entry,
1957 index,
1958 honor_filemode,
1959 symlink_fn,
1960 blob_normalizer,
1961 )
1963 # Handle force_remove_untracked
1964 if force_remove_untracked:
1965 for root, dirs, files in os.walk(repo_path):
1966 if b".git" in os.fsencode(root):
1967 continue
1968 root_bytes = os.fsencode(root)
1969 for file in files:
1970 full_path = os.path.join(root_bytes, os.fsencode(file))
1971 tree_path = os.path.relpath(full_path, repo_path)
1972 if os.sep != "/":
1973 tree_path = tree_path.replace(os.sep.encode(), b"/")
1975 if tree_path not in new_paths:
1976 _remove_file_with_readonly_handling(full_path)
1977 if tree_path in index:
1978 del index[tree_path]
1980 # Clean up empty directories
1981 for root, dirs, files in os.walk(repo_path, topdown=False):
1982 root_bytes = os.fsencode(root)
1983 if (
1984 b".git" not in root_bytes
1985 and root_bytes != repo_path
1986 and not files
1987 and not dirs
1988 ):
1989 try:
1990 os.rmdir(root)
1991 except FileNotFoundError:
1992 # Directory was already removed
1993 pass
1994 except OSError as e:
1995 if e.errno != errno.ENOTEMPTY:
1996 # Only ignore "directory not empty" errors
1997 raise
1999 index.write()
2002def get_unstaged_changes(
2003 index: Index,
2004 root_path: Union[str, bytes],
2005 filter_blob_callback: Optional[Callable] = None,
2006) -> Generator[bytes, None, None]:
2007 """Walk through an index and check for differences against working tree.
2009 Args:
2010 index: index to check
2011 root_path: path in which to find files
2012 Returns: iterator over paths with unstaged changes
2013 """
2014 # For each entry in the index check the sha1 & ensure not staged
2015 if not isinstance(root_path, bytes):
2016 root_path = os.fsencode(root_path)
2018 for tree_path, entry in index.iteritems():
2019 full_path = _tree_to_fs_path(root_path, tree_path)
2020 if isinstance(entry, ConflictedIndexEntry):
2021 # Conflicted files are always unstaged
2022 yield tree_path
2023 continue
2025 try:
2026 st = os.lstat(full_path)
2027 if stat.S_ISDIR(st.st_mode):
2028 if _has_directory_changed(tree_path, entry):
2029 yield tree_path
2030 continue
2032 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2033 continue
2035 blob = blob_from_path_and_stat(full_path, st)
2037 if filter_blob_callback is not None:
2038 blob = filter_blob_callback(blob, tree_path)
2039 except FileNotFoundError:
2040 # The file was removed, so we assume that counts as
2041 # different from whatever file used to exist.
2042 yield tree_path
2043 else:
2044 if blob.id != entry.sha:
2045 yield tree_path
2048def _tree_to_fs_path(root_path: bytes, tree_path: bytes) -> bytes:
2049 """Convert a git tree path to a file system path.
2051 Args:
2052 root_path: Root filesystem path
2053 tree_path: Git tree path as bytes
2055 Returns: File system path.
2056 """
2057 assert isinstance(tree_path, bytes)
2058 if os_sep_bytes != b"/":
2059 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
2060 else:
2061 sep_corrected_path = tree_path
2062 return os.path.join(root_path, sep_corrected_path)
2065def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes:
2066 """Convert a file system path to a git tree path.
2068 Args:
2069 fs_path: File system path.
2071 Returns: Git tree path as bytes
2072 """
2073 if not isinstance(fs_path, bytes):
2074 fs_path_bytes = os.fsencode(fs_path)
2075 else:
2076 fs_path_bytes = fs_path
2077 if os_sep_bytes != b"/":
2078 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2079 else:
2080 tree_path = fs_path_bytes
2081 return tree_path
2084def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
2085 if os.path.exists(os.path.join(path, b".git")):
2086 head = read_submodule_head(path)
2087 if head is None:
2088 return None
2089 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2090 return None
2093def index_entry_from_path(
2094 path: bytes, object_store: Optional[ObjectContainer] = None
2095) -> Optional[IndexEntry]:
2096 """Create an index from a filesystem path.
2098 This returns an index value for files, symlinks
2099 and tree references. for directories and
2100 non-existent files it returns None
2102 Args:
2103 path: Path to create an index entry for
2104 object_store: Optional object store to
2105 save new blobs in
2106 Returns: An index entry; None for directories
2107 """
2108 assert isinstance(path, bytes)
2109 st = os.lstat(path)
2110 if stat.S_ISDIR(st.st_mode):
2111 return index_entry_from_directory(st, path)
2113 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
2114 blob = blob_from_path_and_stat(path, st)
2115 if object_store is not None:
2116 object_store.add_object(blob)
2117 return index_entry_from_stat(st, blob.id)
2119 return None
2122def iter_fresh_entries(
2123 paths: Iterable[bytes],
2124 root_path: bytes,
2125 object_store: Optional[ObjectContainer] = None,
2126) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
2127 """Iterate over current versions of index entries on disk.
2129 Args:
2130 paths: Paths to iterate over
2131 root_path: Root path to access from
2132 object_store: Optional store to save new blobs in
2133 Returns: Iterator over path, index_entry
2134 """
2135 for path in paths:
2136 p = _tree_to_fs_path(root_path, path)
2137 try:
2138 entry = index_entry_from_path(p, object_store=object_store)
2139 except (FileNotFoundError, IsADirectoryError):
2140 entry = None
2141 yield path, entry
2144def iter_fresh_objects(
2145 paths: Iterable[bytes],
2146 root_path: bytes,
2147 include_deleted: bool = False,
2148 object_store: Optional[ObjectContainer] = None,
2149) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
2150 """Iterate over versions of objects on disk referenced by index.
2152 Args:
2153 root_path: Root path to access from
2154 include_deleted: Include deleted entries with sha and
2155 mode set to None
2156 object_store: Optional object store to report new items to
2157 Returns: Iterator over path, sha, mode
2158 """
2159 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
2160 if entry is None:
2161 if include_deleted:
2162 yield path, None, None
2163 else:
2164 yield path, entry.sha, cleanup_mode(entry.mode)
2167def refresh_index(index: Index, root_path: bytes) -> None:
2168 """Refresh the contents of an index.
2170 This is the equivalent to running 'git commit -a'.
2172 Args:
2173 index: Index to update
2174 root_path: Root filesystem path
2175 """
2176 for path, entry in iter_fresh_entries(index, root_path):
2177 if entry:
2178 index[path] = entry
2181class locked_index:
2182 """Lock the index while making modifications.
2184 Works as a context manager.
2185 """
2187 _file: "_GitFile"
2189 def __init__(self, path: Union[bytes, str]) -> None:
2190 self._path = path
2192 def __enter__(self) -> Index:
2193 self._file = GitFile(self._path, "wb")
2194 self._index = Index(self._path)
2195 return self._index
2197 def __exit__(
2198 self,
2199 exc_type: Optional[type],
2200 exc_value: Optional[BaseException],
2201 traceback: Optional[types.TracebackType],
2202 ) -> None:
2203 if exc_type is not None:
2204 self._file.abort()
2205 return
2206 try:
2207 from typing import BinaryIO, cast
2209 f = SHA1Writer(cast(BinaryIO, self._file))
2210 write_index_dict(cast(BinaryIO, f), self._index._byname)
2211 except BaseException:
2212 self._file.abort()
2213 else:
2214 f.close()