Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import Generator, Iterable, Iterator
32from dataclasses import dataclass
33from enum import Enum
34from typing import (
35 TYPE_CHECKING,
36 Any,
37 BinaryIO,
38 Callable,
39 Optional,
40 Union,
41 cast,
42)
44if TYPE_CHECKING:
45 from .file import _GitFile
46 from .line_ending import BlobNormalizer
47 from .repo import Repo
49from .file import GitFile
50from .object_store import iter_tree_contents
51from .objects import (
52 S_IFGITLINK,
53 S_ISGITLINK,
54 Blob,
55 ObjectID,
56 Tree,
57 hex_to_sha,
58 sha_to_hex,
59)
60from .pack import ObjectContainer, SHA1Reader, SHA1Writer
62# 2-bit stage (during merge)
63FLAG_STAGEMASK = 0x3000
64FLAG_STAGESHIFT = 12
65FLAG_NAMEMASK = 0x0FFF
67# assume-valid
68FLAG_VALID = 0x8000
70# extended flag (must be zero in version 2)
71FLAG_EXTENDED = 0x4000
73# used by sparse checkout
74EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
76# used by "git add -N"
77EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
79DEFAULT_VERSION = 2
81# Index extension signatures
82TREE_EXTENSION = b"TREE"
83REUC_EXTENSION = b"REUC"
84UNTR_EXTENSION = b"UNTR"
85EOIE_EXTENSION = b"EOIE"
86IEOT_EXTENSION = b"IEOT"
89def _encode_varint(value: int) -> bytes:
90 """Encode an integer using variable-width encoding.
92 Same format as used for OFS_DELTA pack entries and index v4 path compression.
93 Uses 7 bits per byte, with the high bit indicating continuation.
95 Args:
96 value: Integer to encode
97 Returns:
98 Encoded bytes
99 """
100 if value == 0:
101 return b"\x00"
103 result = []
104 while value > 0:
105 byte = value & 0x7F # Take lower 7 bits
106 value >>= 7
107 if value > 0:
108 byte |= 0x80 # Set continuation bit
109 result.append(byte)
111 return bytes(result)
114def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
115 """Decode a variable-width encoded integer.
117 Args:
118 data: Bytes to decode from
119 offset: Starting offset in data
120 Returns:
121 tuple of (decoded_value, new_offset)
122 """
123 value = 0
124 shift = 0
125 pos = offset
127 while pos < len(data):
128 byte = data[pos]
129 pos += 1
130 value |= (byte & 0x7F) << shift
131 shift += 7
132 if not (byte & 0x80): # No continuation bit
133 break
135 return value, pos
138def _compress_path(path: bytes, previous_path: bytes) -> bytes:
139 """Compress a path relative to the previous path for index version 4.
141 Args:
142 path: Path to compress
143 previous_path: Previous path for comparison
144 Returns:
145 Compressed path data (varint prefix_len + suffix)
146 """
147 # Find the common prefix length
148 common_len = 0
149 min_len = min(len(path), len(previous_path))
151 for i in range(min_len):
152 if path[i] == previous_path[i]:
153 common_len += 1
154 else:
155 break
157 # The number of bytes to remove from the end of previous_path
158 # to get the common prefix
159 remove_len = len(previous_path) - common_len
161 # The suffix to append
162 suffix = path[common_len:]
164 # Encode: varint(remove_len) + suffix + NUL
165 return _encode_varint(remove_len) + suffix + b"\x00"
168def _decompress_path(
169 data: bytes, offset: int, previous_path: bytes
170) -> tuple[bytes, int]:
171 """Decompress a path from index version 4 compressed format.
173 Args:
174 data: Raw data containing compressed path
175 offset: Starting offset in data
176 previous_path: Previous path for decompression
177 Returns:
178 tuple of (decompressed_path, new_offset)
179 """
180 # Decode the number of bytes to remove from previous path
181 remove_len, new_offset = _decode_varint(data, offset)
183 # Find the NUL terminator for the suffix
184 suffix_start = new_offset
185 suffix_end = suffix_start
186 while suffix_end < len(data) and data[suffix_end] != 0:
187 suffix_end += 1
189 if suffix_end >= len(data):
190 raise ValueError("Unterminated path suffix in compressed entry")
192 suffix = data[suffix_start:suffix_end]
193 new_offset = suffix_end + 1 # Skip the NUL terminator
195 # Reconstruct the path
196 if remove_len > len(previous_path):
197 raise ValueError(
198 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
199 )
201 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
202 path = prefix + suffix
204 return path, new_offset
207def _decompress_path_from_stream(
208 f: BinaryIO, previous_path: bytes
209) -> tuple[bytes, int]:
210 """Decompress a path from index version 4 compressed format, reading from stream.
212 Args:
213 f: File-like object to read from
214 previous_path: Previous path for decompression
215 Returns:
216 tuple of (decompressed_path, bytes_consumed)
217 """
218 # Decode the varint for remove_len by reading byte by byte
219 remove_len = 0
220 shift = 0
221 bytes_consumed = 0
223 while True:
224 byte_data = f.read(1)
225 if not byte_data:
226 raise ValueError("Unexpected end of file while reading varint")
227 byte = byte_data[0]
228 bytes_consumed += 1
229 remove_len |= (byte & 0x7F) << shift
230 shift += 7
231 if not (byte & 0x80): # No continuation bit
232 break
234 # Read the suffix until NUL terminator
235 suffix = b""
236 while True:
237 byte_data = f.read(1)
238 if not byte_data:
239 raise ValueError("Unexpected end of file while reading path suffix")
240 byte = byte_data[0]
241 bytes_consumed += 1
242 if byte == 0: # NUL terminator
243 break
244 suffix += bytes([byte])
246 # Reconstruct the path
247 if remove_len > len(previous_path):
248 raise ValueError(
249 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
250 )
252 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
253 path = prefix + suffix
255 return path, bytes_consumed
258class Stage(Enum):
259 NORMAL = 0
260 MERGE_CONFLICT_ANCESTOR = 1
261 MERGE_CONFLICT_THIS = 2
262 MERGE_CONFLICT_OTHER = 3
265@dataclass
266class SerializedIndexEntry:
267 name: bytes
268 ctime: Union[int, float, tuple[int, int]]
269 mtime: Union[int, float, tuple[int, int]]
270 dev: int
271 ino: int
272 mode: int
273 uid: int
274 gid: int
275 size: int
276 sha: bytes
277 flags: int
278 extended_flags: int
280 def stage(self) -> Stage:
281 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
284@dataclass
285class IndexExtension:
286 """Base class for index extensions."""
288 signature: bytes
289 data: bytes
291 @classmethod
292 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
293 """Create an extension from raw data.
295 Args:
296 signature: 4-byte extension signature
297 data: Extension data
298 Returns:
299 Parsed extension object
300 """
301 if signature == TREE_EXTENSION:
302 return TreeExtension.from_bytes(data)
303 elif signature == REUC_EXTENSION:
304 return ResolveUndoExtension.from_bytes(data)
305 elif signature == UNTR_EXTENSION:
306 return UntrackedExtension.from_bytes(data)
307 else:
308 # Unknown extension - just store raw data
309 return cls(signature, data)
311 def to_bytes(self) -> bytes:
312 """Serialize extension to bytes."""
313 return self.data
316class TreeExtension(IndexExtension):
317 """Tree cache extension."""
319 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
320 self.entries = entries
321 super().__init__(TREE_EXTENSION, b"")
323 @classmethod
324 def from_bytes(cls, data: bytes) -> "TreeExtension":
325 # TODO: Implement tree cache parsing
326 return cls([])
328 def to_bytes(self) -> bytes:
329 # TODO: Implement tree cache serialization
330 return b""
333class ResolveUndoExtension(IndexExtension):
334 """Resolve undo extension for recording merge conflicts."""
336 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
337 self.entries = entries
338 super().__init__(REUC_EXTENSION, b"")
340 @classmethod
341 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
342 # TODO: Implement resolve undo parsing
343 return cls([])
345 def to_bytes(self) -> bytes:
346 # TODO: Implement resolve undo serialization
347 return b""
350class UntrackedExtension(IndexExtension):
351 """Untracked cache extension."""
353 def __init__(self, data: bytes) -> None:
354 super().__init__(UNTR_EXTENSION, data)
356 @classmethod
357 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
358 return cls(data)
361@dataclass
362class IndexEntry:
363 ctime: Union[int, float, tuple[int, int]]
364 mtime: Union[int, float, tuple[int, int]]
365 dev: int
366 ino: int
367 mode: int
368 uid: int
369 gid: int
370 size: int
371 sha: bytes
372 flags: int = 0
373 extended_flags: int = 0
375 @classmethod
376 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
377 return cls(
378 ctime=serialized.ctime,
379 mtime=serialized.mtime,
380 dev=serialized.dev,
381 ino=serialized.ino,
382 mode=serialized.mode,
383 uid=serialized.uid,
384 gid=serialized.gid,
385 size=serialized.size,
386 sha=serialized.sha,
387 flags=serialized.flags,
388 extended_flags=serialized.extended_flags,
389 )
391 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
392 # Clear out any existing stage bits, then set them from the Stage.
393 new_flags = self.flags & ~FLAG_STAGEMASK
394 new_flags |= stage.value << FLAG_STAGESHIFT
395 return SerializedIndexEntry(
396 name=name,
397 ctime=self.ctime,
398 mtime=self.mtime,
399 dev=self.dev,
400 ino=self.ino,
401 mode=self.mode,
402 uid=self.uid,
403 gid=self.gid,
404 size=self.size,
405 sha=self.sha,
406 flags=new_flags,
407 extended_flags=self.extended_flags,
408 )
410 def stage(self) -> Stage:
411 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
413 @property
414 def skip_worktree(self) -> bool:
415 """Return True if the skip-worktree bit is set in extended_flags."""
416 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
418 def set_skip_worktree(self, skip: bool = True) -> None:
419 """Helper method to set or clear the skip-worktree bit in extended_flags.
420 Also sets FLAG_EXTENDED in self.flags if needed.
421 """
422 if skip:
423 # Turn on the skip-worktree bit
424 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
425 # Also ensure the main 'extended' bit is set in flags
426 self.flags |= FLAG_EXTENDED
427 else:
428 # Turn off the skip-worktree bit
429 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
430 # Optionally unset the main extended bit if no extended flags remain
431 if self.extended_flags == 0:
432 self.flags &= ~FLAG_EXTENDED
435class ConflictedIndexEntry:
436 """Index entry that represents a conflict."""
438 ancestor: Optional[IndexEntry]
439 this: Optional[IndexEntry]
440 other: Optional[IndexEntry]
442 def __init__(
443 self,
444 ancestor: Optional[IndexEntry] = None,
445 this: Optional[IndexEntry] = None,
446 other: Optional[IndexEntry] = None,
447 ) -> None:
448 self.ancestor = ancestor
449 self.this = this
450 self.other = other
453class UnmergedEntries(Exception):
454 """Unmerged entries exist in the index."""
457def pathsplit(path: bytes) -> tuple[bytes, bytes]:
458 """Split a /-delimited path into a directory part and a basename.
460 Args:
461 path: The path to split.
463 Returns:
464 Tuple with directory name and basename
465 """
466 try:
467 (dirname, basename) = path.rsplit(b"/", 1)
468 except ValueError:
469 return (b"", path)
470 else:
471 return (dirname, basename)
474def pathjoin(*args: bytes) -> bytes:
475 """Join a /-delimited path."""
476 return b"/".join([p for p in args if p])
479def read_cache_time(f: BinaryIO) -> tuple[int, int]:
480 """Read a cache time.
482 Args:
483 f: File-like object to read from
484 Returns:
485 Tuple with seconds and nanoseconds
486 """
487 return struct.unpack(">LL", f.read(8))
490def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None:
491 """Write a cache time.
493 Args:
494 f: File-like object to write to
495 t: Time to write (as int, float or tuple with secs and nsecs)
496 """
497 if isinstance(t, int):
498 t = (t, 0)
499 elif isinstance(t, float):
500 (secs, nsecs) = divmod(t, 1.0)
501 t = (int(secs), int(nsecs * 1000000000))
502 elif not isinstance(t, tuple):
503 raise TypeError(t)
504 f.write(struct.pack(">LL", *t))
507def read_cache_entry(
508 f: BinaryIO, version: int, previous_path: bytes = b""
509) -> SerializedIndexEntry:
510 """Read an entry from a cache file.
512 Args:
513 f: File-like object to read from
514 version: Index version
515 previous_path: Previous entry's path (for version 4 compression)
516 """
517 beginoffset = f.tell()
518 ctime = read_cache_time(f)
519 mtime = read_cache_time(f)
520 (
521 dev,
522 ino,
523 mode,
524 uid,
525 gid,
526 size,
527 sha,
528 flags,
529 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
530 if flags & FLAG_EXTENDED:
531 if version < 3:
532 raise AssertionError("extended flag set in index with version < 3")
533 (extended_flags,) = struct.unpack(">H", f.read(2))
534 else:
535 extended_flags = 0
537 if version >= 4:
538 # Version 4: paths are always compressed (name_len should be 0)
539 name, consumed = _decompress_path_from_stream(f, previous_path)
540 else:
541 # Versions < 4: regular name reading
542 name = f.read(flags & FLAG_NAMEMASK)
544 # Padding:
545 if version < 4:
546 real_size = (f.tell() - beginoffset + 8) & ~7
547 f.read((beginoffset + real_size) - f.tell())
549 return SerializedIndexEntry(
550 name,
551 ctime,
552 mtime,
553 dev,
554 ino,
555 mode,
556 uid,
557 gid,
558 size,
559 sha_to_hex(sha),
560 flags & ~FLAG_NAMEMASK,
561 extended_flags,
562 )
565def write_cache_entry(
566 f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
567) -> None:
568 """Write an index entry to a file.
570 Args:
571 f: File object
572 entry: IndexEntry to write
573 version: Index format version
574 previous_path: Previous entry's path (for version 4 compression)
575 """
576 beginoffset = f.tell()
577 write_cache_time(f, entry.ctime)
578 write_cache_time(f, entry.mtime)
580 if version >= 4:
581 # Version 4: use compression but set name_len to actual filename length
582 # This matches how C Git implements index v4 flags
583 compressed_path = _compress_path(entry.name, previous_path)
584 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
585 else:
586 # Versions < 4: include actual name length
587 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
589 if entry.extended_flags:
590 flags |= FLAG_EXTENDED
591 if flags & FLAG_EXTENDED and version is not None and version < 3:
592 raise AssertionError("unable to use extended flags in version < 3")
594 f.write(
595 struct.pack(
596 b">LLLLLL20sH",
597 entry.dev & 0xFFFFFFFF,
598 entry.ino & 0xFFFFFFFF,
599 entry.mode,
600 entry.uid,
601 entry.gid,
602 entry.size,
603 hex_to_sha(entry.sha),
604 flags,
605 )
606 )
607 if flags & FLAG_EXTENDED:
608 f.write(struct.pack(b">H", entry.extended_flags))
610 if version >= 4:
611 # Version 4: always write compressed path
612 f.write(compressed_path)
613 else:
614 # Versions < 4: write regular path and padding
615 f.write(entry.name)
616 real_size = (f.tell() - beginoffset + 8) & ~7
617 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
620class UnsupportedIndexFormat(Exception):
621 """An unsupported index format was encountered."""
623 def __init__(self, version: int) -> None:
624 self.index_format_version = version
627def read_index_header(f: BinaryIO) -> tuple[int, int]:
628 """Read an index header from a file.
630 Returns:
631 tuple of (version, num_entries)
632 """
633 header = f.read(4)
634 if header != b"DIRC":
635 raise AssertionError(f"Invalid index file header: {header!r}")
636 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
637 if version not in (1, 2, 3, 4):
638 raise UnsupportedIndexFormat(version)
639 return version, num_entries
642def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None:
643 """Write an index extension.
645 Args:
646 f: File-like object to write to
647 extension: Extension to write
648 """
649 data = extension.to_bytes()
650 f.write(extension.signature)
651 f.write(struct.pack(">I", len(data)))
652 f.write(data)
655def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
656 """Read an index file, yielding the individual entries."""
657 version, num_entries = read_index_header(f)
658 previous_path = b""
659 for i in range(num_entries):
660 entry = read_cache_entry(f, version, previous_path)
661 previous_path = entry.name
662 yield entry
665def read_index_dict_with_version(
666 f: BinaryIO,
667) -> tuple[
668 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
669]:
670 """Read an index file and return it as a dictionary along with the version.
672 Returns:
673 tuple of (entries_dict, version, extensions)
674 """
675 version, num_entries = read_index_header(f)
677 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
678 previous_path = b""
679 for i in range(num_entries):
680 entry = read_cache_entry(f, version, previous_path)
681 previous_path = entry.name
682 stage = entry.stage()
683 if stage == Stage.NORMAL:
684 ret[entry.name] = IndexEntry.from_serialized(entry)
685 else:
686 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
687 if isinstance(existing, IndexEntry):
688 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
689 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
690 existing.ancestor = IndexEntry.from_serialized(entry)
691 elif stage == Stage.MERGE_CONFLICT_THIS:
692 existing.this = IndexEntry.from_serialized(entry)
693 elif stage == Stage.MERGE_CONFLICT_OTHER:
694 existing.other = IndexEntry.from_serialized(entry)
696 # Read extensions
697 extensions = []
698 while True:
699 # Check if we're at the end (20 bytes before EOF for SHA checksum)
700 current_pos = f.tell()
701 f.seek(0, 2) # EOF
702 eof_pos = f.tell()
703 f.seek(current_pos)
705 if current_pos >= eof_pos - 20:
706 break
708 # Try to read extension signature
709 signature = f.read(4)
710 if len(signature) < 4:
711 break
713 # Check if it's a valid extension signature (4 uppercase letters)
714 if not all(65 <= b <= 90 for b in signature):
715 # Not an extension, seek back
716 f.seek(-4, 1)
717 break
719 # Read extension size
720 size_data = f.read(4)
721 if len(size_data) < 4:
722 break
723 size = struct.unpack(">I", size_data)[0]
725 # Read extension data
726 data = f.read(size)
727 if len(data) < size:
728 break
730 extension = IndexExtension.from_raw(signature, data)
731 extensions.append(extension)
733 return ret, version, extensions
736def read_index_dict(
737 f: BinaryIO,
738) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
739 """Read an index file and return it as a dictionary.
740 Dict Key is tuple of path and stage number, as
741 path alone is not unique
742 Args:
743 f: File object to read fromls.
744 """
745 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
746 for entry in read_index(f):
747 stage = entry.stage()
748 if stage == Stage.NORMAL:
749 ret[entry.name] = IndexEntry.from_serialized(entry)
750 else:
751 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
752 if isinstance(existing, IndexEntry):
753 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
754 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
755 existing.ancestor = IndexEntry.from_serialized(entry)
756 elif stage == Stage.MERGE_CONFLICT_THIS:
757 existing.this = IndexEntry.from_serialized(entry)
758 elif stage == Stage.MERGE_CONFLICT_OTHER:
759 existing.other = IndexEntry.from_serialized(entry)
760 return ret
763def write_index(
764 f: BinaryIO,
765 entries: list[SerializedIndexEntry],
766 version: Optional[int] = None,
767 extensions: Optional[list[IndexExtension]] = None,
768) -> None:
769 """Write an index file.
771 Args:
772 f: File-like object to write to
773 version: Version number to write
774 entries: Iterable over the entries to write
775 extensions: Optional list of extensions to write
776 """
777 if version is None:
778 version = DEFAULT_VERSION
779 # STEP 1: check if any extended_flags are set
780 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
781 if uses_extended_flags and version < 3:
782 # Force or bump the version to 3
783 version = 3
784 # The rest is unchanged, but you might insert a final check:
785 if version < 3:
786 # Double-check no extended flags appear
787 for e in entries:
788 if e.extended_flags != 0:
789 raise AssertionError("Attempt to use extended flags in index < v3")
790 # Proceed with the existing code to write the header and entries.
791 f.write(b"DIRC")
792 f.write(struct.pack(b">LL", version, len(entries)))
793 previous_path = b""
794 for entry in entries:
795 write_cache_entry(f, entry, version=version, previous_path=previous_path)
796 previous_path = entry.name
798 # Write extensions
799 if extensions:
800 for extension in extensions:
801 write_index_extension(f, extension)
804def write_index_dict(
805 f: BinaryIO,
806 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
807 version: Optional[int] = None,
808 extensions: Optional[list[IndexExtension]] = None,
809) -> None:
810 """Write an index file based on the contents of a dictionary.
811 being careful to sort by path and then by stage.
812 """
813 entries_list = []
814 for key in sorted(entries):
815 value = entries[key]
816 if isinstance(value, ConflictedIndexEntry):
817 if value.ancestor is not None:
818 entries_list.append(
819 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
820 )
821 if value.this is not None:
822 entries_list.append(
823 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
824 )
825 if value.other is not None:
826 entries_list.append(
827 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
828 )
829 else:
830 entries_list.append(value.serialize(key, Stage.NORMAL))
832 write_index(f, entries_list, version=version, extensions=extensions)
835def cleanup_mode(mode: int) -> int:
836 """Cleanup a mode value.
838 This will return a mode that can be stored in a tree object.
840 Args:
841 mode: Mode to clean up.
843 Returns:
844 mode
845 """
846 if stat.S_ISLNK(mode):
847 return stat.S_IFLNK
848 elif stat.S_ISDIR(mode):
849 return stat.S_IFDIR
850 elif S_ISGITLINK(mode):
851 return S_IFGITLINK
852 ret = stat.S_IFREG | 0o644
853 if mode & 0o100:
854 ret |= 0o111
855 return ret
858class Index:
859 """A Git Index file."""
861 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
863 def __init__(
864 self,
865 filename: Union[bytes, str, os.PathLike],
866 read: bool = True,
867 skip_hash: bool = False,
868 version: Optional[int] = None,
869 ) -> None:
870 """Create an index object associated with the given filename.
872 Args:
873 filename: Path to the index file
874 read: Whether to initialize the index from the given file, should it exist.
875 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
876 version: Index format version to use (None = auto-detect from file or use default)
877 """
878 self._filename = os.fspath(filename)
879 # TODO(jelmer): Store the version returned by read_index
880 self._version = version
881 self._skip_hash = skip_hash
882 self._extensions: list[IndexExtension] = []
883 self.clear()
884 if read:
885 self.read()
887 @property
888 def path(self) -> Union[bytes, str]:
889 return self._filename
891 def __repr__(self) -> str:
892 return f"{self.__class__.__name__}({self._filename!r})"
894 def write(self) -> None:
895 """Write current contents of index to disk."""
896 from typing import BinaryIO, cast
898 f = GitFile(self._filename, "wb")
899 try:
900 if self._skip_hash:
901 # When skipHash is enabled, write the index without computing SHA1
902 write_index_dict(
903 cast(BinaryIO, f),
904 self._byname,
905 version=self._version,
906 extensions=self._extensions,
907 )
908 # Write 20 zero bytes instead of SHA1
909 f.write(b"\x00" * 20)
910 f.close()
911 else:
912 sha1_writer = SHA1Writer(cast(BinaryIO, f))
913 write_index_dict(
914 cast(BinaryIO, sha1_writer),
915 self._byname,
916 version=self._version,
917 extensions=self._extensions,
918 )
919 sha1_writer.close()
920 except:
921 f.close()
922 raise
924 def read(self) -> None:
925 """Read current contents of index from disk."""
926 if not os.path.exists(self._filename):
927 return
928 f = GitFile(self._filename, "rb")
929 try:
930 sha1_reader = SHA1Reader(f)
931 entries, version, extensions = read_index_dict_with_version(
932 cast(BinaryIO, sha1_reader)
933 )
934 self._version = version
935 self._extensions = extensions
936 self.update(entries)
937 # Extensions have already been read by read_index_dict_with_version
938 sha1_reader.check_sha(allow_empty=True)
939 finally:
940 f.close()
942 def __len__(self) -> int:
943 """Number of entries in this index file."""
944 return len(self._byname)
946 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
947 """Retrieve entry by relative path and stage.
949 Returns: Either a IndexEntry or a ConflictedIndexEntry
950 Raises KeyError: if the entry does not exist
951 """
952 return self._byname[key]
954 def __iter__(self) -> Iterator[bytes]:
955 """Iterate over the paths and stages in this index."""
956 return iter(self._byname)
958 def __contains__(self, key: bytes) -> bool:
959 return key in self._byname
961 def get_sha1(self, path: bytes) -> bytes:
962 """Return the (git object) SHA1 for the object at a path."""
963 value = self[path]
964 if isinstance(value, ConflictedIndexEntry):
965 raise UnmergedEntries
966 return value.sha
968 def get_mode(self, path: bytes) -> int:
969 """Return the POSIX file mode for the object at a path."""
970 value = self[path]
971 if isinstance(value, ConflictedIndexEntry):
972 raise UnmergedEntries
973 return value.mode
975 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
976 """Iterate over path, sha, mode tuples for use with commit_tree."""
977 for path in self:
978 entry = self[path]
979 if isinstance(entry, ConflictedIndexEntry):
980 raise UnmergedEntries
981 yield path, entry.sha, cleanup_mode(entry.mode)
983 def has_conflicts(self) -> bool:
984 for value in self._byname.values():
985 if isinstance(value, ConflictedIndexEntry):
986 return True
987 return False
989 def clear(self) -> None:
990 """Remove all contents from this index."""
991 self._byname = {}
993 def __setitem__(
994 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
995 ) -> None:
996 assert isinstance(name, bytes)
997 self._byname[name] = value
999 def __delitem__(self, name: bytes) -> None:
1000 del self._byname[name]
1002 def iteritems(
1003 self,
1004 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1005 return iter(self._byname.items())
1007 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1008 return iter(self._byname.items())
1010 def update(
1011 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
1012 ) -> None:
1013 for key, value in entries.items():
1014 self[key] = value
1016 def paths(self) -> Generator[bytes, None, None]:
1017 yield from self._byname.keys()
1019 def changes_from_tree(
1020 self,
1021 object_store: ObjectContainer,
1022 tree: ObjectID,
1023 want_unchanged: bool = False,
1024 ) -> Generator[
1025 tuple[
1026 tuple[Optional[bytes], Optional[bytes]],
1027 tuple[Optional[int], Optional[int]],
1028 tuple[Optional[bytes], Optional[bytes]],
1029 ],
1030 None,
1031 None,
1032 ]:
1033 """Find the differences between the contents of this index and a tree.
1035 Args:
1036 object_store: Object store to use for retrieving tree contents
1037 tree: SHA1 of the root tree
1038 want_unchanged: Whether unchanged files should be reported
1039 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1040 newmode), (oldsha, newsha)
1041 """
1043 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1044 entry = self[path]
1045 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1046 return entry.sha, cleanup_mode(entry.mode)
1047 else:
1048 # Handle ConflictedIndexEntry case
1049 return b"", 0
1051 yield from changes_from_tree(
1052 self.paths(),
1053 lookup_entry,
1054 object_store,
1055 tree,
1056 want_unchanged=want_unchanged,
1057 )
1059 def commit(self, object_store: ObjectContainer) -> bytes:
1060 """Create a new tree from an index.
1062 Args:
1063 object_store: Object store to save the tree in
1064 Returns:
1065 Root tree SHA
1066 """
1067 return commit_tree(object_store, self.iterobjects())
1070def commit_tree(
1071 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1072) -> bytes:
1073 """Commit a new tree.
1075 Args:
1076 object_store: Object store to add trees to
1077 blobs: Iterable over blob path, sha, mode entries
1078 Returns:
1079 SHA1 of the created tree.
1080 """
1081 trees: dict[bytes, Any] = {b"": {}}
1083 def add_tree(path: bytes) -> dict[bytes, Any]:
1084 if path in trees:
1085 return trees[path]
1086 dirname, basename = pathsplit(path)
1087 t = add_tree(dirname)
1088 assert isinstance(basename, bytes)
1089 newtree: dict[bytes, Any] = {}
1090 t[basename] = newtree
1091 trees[path] = newtree
1092 return newtree
1094 for path, sha, mode in blobs:
1095 tree_path, basename = pathsplit(path)
1096 tree = add_tree(tree_path)
1097 tree[basename] = (mode, sha)
1099 def build_tree(path: bytes) -> bytes:
1100 tree = Tree()
1101 for basename, entry in trees[path].items():
1102 if isinstance(entry, dict):
1103 mode = stat.S_IFDIR
1104 sha = build_tree(pathjoin(path, basename))
1105 else:
1106 (mode, sha) = entry
1107 tree.add(basename, mode, sha)
1108 object_store.add_object(tree)
1109 return tree.id
1111 return build_tree(b"")
1114def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1115 """Create a new tree from an index.
1117 Args:
1118 object_store: Object store to save the tree in
1119 index: Index file
1120 Note: This function is deprecated, use index.commit() instead.
1121 Returns: Root tree sha.
1122 """
1123 return commit_tree(object_store, index.iterobjects())
1126def changes_from_tree(
1127 names: Iterable[bytes],
1128 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1129 object_store: ObjectContainer,
1130 tree: Optional[bytes],
1131 want_unchanged: bool = False,
1132) -> Iterable[
1133 tuple[
1134 tuple[Optional[bytes], Optional[bytes]],
1135 tuple[Optional[int], Optional[int]],
1136 tuple[Optional[bytes], Optional[bytes]],
1137 ]
1138]:
1139 """Find the differences between the contents of a tree and
1140 a working copy.
1142 Args:
1143 names: Iterable of names in the working copy
1144 lookup_entry: Function to lookup an entry in the working copy
1145 object_store: Object store to use for retrieving tree contents
1146 tree: SHA1 of the root tree, or None for an empty tree
1147 want_unchanged: Whether unchanged files should be reported
1148 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1149 (oldsha, newsha)
1150 """
1151 # TODO(jelmer): Support a include_trees option
1152 other_names = set(names)
1154 if tree is not None:
1155 for name, mode, sha in iter_tree_contents(object_store, tree):
1156 try:
1157 (other_sha, other_mode) = lookup_entry(name)
1158 except KeyError:
1159 # Was removed
1160 yield ((name, None), (mode, None), (sha, None))
1161 else:
1162 other_names.remove(name)
1163 if want_unchanged or other_sha != sha or other_mode != mode:
1164 yield ((name, name), (mode, other_mode), (sha, other_sha))
1166 # Mention added files
1167 for name in other_names:
1168 try:
1169 (other_sha, other_mode) = lookup_entry(name)
1170 except KeyError:
1171 pass
1172 else:
1173 yield ((None, name), (None, other_mode), (None, other_sha))
1176def index_entry_from_stat(
1177 stat_val: os.stat_result,
1178 hex_sha: bytes,
1179 mode: Optional[int] = None,
1180) -> IndexEntry:
1181 """Create a new index entry from a stat value.
1183 Args:
1184 stat_val: POSIX stat_result instance
1185 hex_sha: Hex sha of the object
1186 """
1187 if mode is None:
1188 mode = cleanup_mode(stat_val.st_mode)
1190 return IndexEntry(
1191 ctime=stat_val.st_ctime,
1192 mtime=stat_val.st_mtime,
1193 dev=stat_val.st_dev,
1194 ino=stat_val.st_ino,
1195 mode=mode,
1196 uid=stat_val.st_uid,
1197 gid=stat_val.st_gid,
1198 size=stat_val.st_size,
1199 sha=hex_sha,
1200 flags=0,
1201 extended_flags=0,
1202 )
1205if sys.platform == "win32":
1206 # On Windows, creating symlinks either requires administrator privileges
1207 # or developer mode. Raise a more helpful error when we're unable to
1208 # create symlinks
1210 # https://github.com/jelmer/dulwich/issues/1005
1212 class WindowsSymlinkPermissionError(PermissionError):
1213 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
1214 super(PermissionError, self).__init__(
1215 errno,
1216 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1217 filename,
1218 )
1220 def symlink(
1221 src: Union[str, bytes],
1222 dst: Union[str, bytes],
1223 target_is_directory: bool = False,
1224 *,
1225 dir_fd: Optional[int] = None,
1226 ) -> None:
1227 try:
1228 return os.symlink(
1229 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1230 )
1231 except PermissionError as e:
1232 raise WindowsSymlinkPermissionError(
1233 e.errno or 0, e.strerror or "", e.filename
1234 ) from e
1235else:
1236 symlink = os.symlink
1239def build_file_from_blob(
1240 blob: Blob,
1241 mode: int,
1242 target_path: bytes,
1243 *,
1244 honor_filemode: bool = True,
1245 tree_encoding: str = "utf-8",
1246 symlink_fn: Optional[Callable] = None,
1247) -> os.stat_result:
1248 """Build a file or symlink on disk based on a Git object.
1250 Args:
1251 blob: The git object
1252 mode: File mode
1253 target_path: Path to write to
1254 honor_filemode: An optional flag to honor core.filemode setting in
1255 config file, default is core.filemode=True, change executable bit
1256 symlink_fn: Function to use for creating symlinks
1257 Returns: stat object for the file
1258 """
1259 try:
1260 oldstat = os.lstat(target_path)
1261 except FileNotFoundError:
1262 oldstat = None
1263 contents = blob.as_raw_string()
1264 if stat.S_ISLNK(mode):
1265 if oldstat:
1266 _remove_file_with_readonly_handling(target_path)
1267 if sys.platform == "win32":
1268 # os.readlink on Python3 on Windows requires a unicode string.
1269 contents_str = contents.decode(tree_encoding)
1270 target_path_str = target_path.decode(tree_encoding)
1271 (symlink_fn or symlink)(contents_str, target_path_str)
1272 else:
1273 (symlink_fn or symlink)(contents, target_path)
1274 else:
1275 if oldstat is not None and oldstat.st_size == len(contents):
1276 with open(target_path, "rb") as f:
1277 if f.read() == contents:
1278 return oldstat
1280 with open(target_path, "wb") as f:
1281 # Write out file
1282 f.write(contents)
1284 if honor_filemode:
1285 os.chmod(target_path, mode)
1287 return os.lstat(target_path)
1290INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1293def validate_path_element_default(element: bytes) -> bool:
1294 return element.lower() not in INVALID_DOTNAMES
1297def validate_path_element_ntfs(element: bytes) -> bool:
1298 stripped = element.rstrip(b". ").lower()
1299 if stripped in INVALID_DOTNAMES:
1300 return False
1301 if stripped == b"git~1":
1302 return False
1303 return True
1306def validate_path(
1307 path: bytes,
1308 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1309) -> bool:
1310 """Default path validator that just checks for .git/."""
1311 parts = path.split(b"/")
1312 for p in parts:
1313 if not element_validator(p):
1314 return False
1315 else:
1316 return True
1319def build_index_from_tree(
1320 root_path: Union[str, bytes],
1321 index_path: Union[str, bytes],
1322 object_store: ObjectContainer,
1323 tree_id: bytes,
1324 honor_filemode: bool = True,
1325 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1326 symlink_fn: Optional[Callable] = None,
1327 blob_normalizer: Optional["BlobNormalizer"] = None,
1328) -> None:
1329 """Generate and materialize index from a tree.
1331 Args:
1332 tree_id: Tree to materialize
1333 root_path: Target dir for materialized index files
1334 index_path: Target path for generated index
1335 object_store: Non-empty object store holding tree contents
1336 honor_filemode: An optional flag to honor core.filemode setting in
1337 config file, default is core.filemode=True, change executable bit
1338 validate_path_element: Function to validate path elements to check
1339 out; default just refuses .git and .. directories.
1340 blob_normalizer: An optional BlobNormalizer to use for converting line
1341 endings when writing blobs to the working directory.
1343 Note: existing index is wiped and contents are not merged
1344 in a working dir. Suitable only for fresh clones.
1345 """
1346 index = Index(index_path, read=False)
1347 if not isinstance(root_path, bytes):
1348 root_path = os.fsencode(root_path)
1350 for entry in iter_tree_contents(object_store, tree_id):
1351 if not validate_path(entry.path, validate_path_element):
1352 continue
1353 full_path = _tree_to_fs_path(root_path, entry.path)
1355 if not os.path.exists(os.path.dirname(full_path)):
1356 os.makedirs(os.path.dirname(full_path))
1358 # TODO(jelmer): Merge new index into working tree
1359 if S_ISGITLINK(entry.mode):
1360 if not os.path.isdir(full_path):
1361 os.mkdir(full_path)
1362 st = os.lstat(full_path)
1363 # TODO(jelmer): record and return submodule paths
1364 else:
1365 obj = object_store[entry.sha]
1366 assert isinstance(obj, Blob)
1367 # Apply blob normalization for checkout if normalizer is provided
1368 if blob_normalizer is not None:
1369 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1370 st = build_file_from_blob(
1371 obj,
1372 entry.mode,
1373 full_path,
1374 honor_filemode=honor_filemode,
1375 symlink_fn=symlink_fn,
1376 )
1378 # Add file to index
1379 if not honor_filemode or S_ISGITLINK(entry.mode):
1380 # we can not use tuple slicing to build a new tuple,
1381 # because on windows that will convert the times to
1382 # longs, which causes errors further along
1383 st_tuple = (
1384 entry.mode,
1385 st.st_ino,
1386 st.st_dev,
1387 st.st_nlink,
1388 st.st_uid,
1389 st.st_gid,
1390 st.st_size,
1391 st.st_atime,
1392 st.st_mtime,
1393 st.st_ctime,
1394 )
1395 st = st.__class__(st_tuple)
1396 # default to a stage 0 index entry (normal)
1397 # when reading from the filesystem
1398 index[entry.path] = index_entry_from_stat(st, entry.sha)
1400 index.write()
1403def blob_from_path_and_mode(
1404 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1405) -> Blob:
1406 """Create a blob from a path and a stat object.
1408 Args:
1409 fs_path: Full file system path to file
1410 mode: File mode
1411 Returns: A `Blob` object
1412 """
1413 assert isinstance(fs_path, bytes)
1414 blob = Blob()
1415 if stat.S_ISLNK(mode):
1416 if sys.platform == "win32":
1417 # os.readlink on Python3 on Windows requires a unicode string.
1418 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1419 else:
1420 blob.data = os.readlink(fs_path)
1421 else:
1422 with open(fs_path, "rb") as f:
1423 blob.data = f.read()
1424 return blob
1427def blob_from_path_and_stat(
1428 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
1429) -> Blob:
1430 """Create a blob from a path and a stat object.
1432 Args:
1433 fs_path: Full file system path to file
1434 st: A stat object
1435 Returns: A `Blob` object
1436 """
1437 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
1440def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
1441 """Read the head commit of a submodule.
1443 Args:
1444 path: path to the submodule
1445 Returns: HEAD sha, None if not a valid head/repository
1446 """
1447 from .errors import NotGitRepository
1448 from .repo import Repo
1450 # Repo currently expects a "str", so decode if necessary.
1451 # TODO(jelmer): Perhaps move this into Repo() ?
1452 if not isinstance(path, str):
1453 path = os.fsdecode(path)
1454 try:
1455 repo = Repo(path)
1456 except NotGitRepository:
1457 return None
1458 try:
1459 return repo.head()
1460 except KeyError:
1461 return None
1464def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
1465 """Check if a directory has changed after getting an error.
1467 When handling an error trying to create a blob from a path, call this
1468 function. It will check if the path is a directory. If it's a directory
1469 and a submodule, check the submodule head to see if it's has changed. If
1470 not, consider the file as changed as Git tracked a file and not a
1471 directory.
1473 Return true if the given path should be considered as changed and False
1474 otherwise or if the path is not a directory.
1475 """
1476 # This is actually a directory
1477 if os.path.exists(os.path.join(tree_path, b".git")):
1478 # Submodule
1479 head = read_submodule_head(tree_path)
1480 if entry.sha != head:
1481 return True
1482 else:
1483 # The file was changed to a directory, so consider it removed.
1484 return True
1486 return False
1489os_sep_bytes = os.sep.encode("ascii")
1492def _ensure_parent_dir_exists(full_path: bytes) -> None:
1493 """Ensure parent directory exists, checking no parent is a file."""
1494 parent_dir = os.path.dirname(full_path)
1495 if parent_dir and not os.path.exists(parent_dir):
1496 # Check if any parent in the path is a file
1497 parts = parent_dir.split(os_sep_bytes)
1498 for i in range(len(parts)):
1499 partial_path = os_sep_bytes.join(parts[: i + 1])
1500 if (
1501 partial_path
1502 and os.path.exists(partial_path)
1503 and not os.path.isdir(partial_path)
1504 ):
1505 # Parent path is a file, this is an error
1506 raise OSError(
1507 f"Cannot create directory, parent path is a file: {partial_path!r}"
1508 )
1509 os.makedirs(parent_dir)
1512def _remove_file_with_readonly_handling(path: bytes) -> None:
1513 """Remove a file, handling read-only files on Windows.
1515 Args:
1516 path: Path to the file to remove
1517 """
1518 try:
1519 os.unlink(path)
1520 except PermissionError:
1521 # On Windows, remove read-only attribute and retry
1522 if sys.platform == "win32":
1523 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
1524 os.unlink(path)
1525 else:
1526 raise
1529def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
1530 """Remove empty parent directories up to stop_at."""
1531 parent = os.path.dirname(path)
1532 while parent and parent != stop_at:
1533 try:
1534 os.rmdir(parent)
1535 parent = os.path.dirname(parent)
1536 except FileNotFoundError:
1537 # Directory doesn't exist - stop trying
1538 break
1539 except OSError as e:
1540 if e.errno == errno.ENOTEMPTY:
1541 # Directory not empty - stop trying
1542 break
1543 raise
1546def _check_symlink_matches(
1547 full_path: bytes, repo_object_store, entry_sha: bytes
1548) -> bool:
1549 """Check if symlink target matches expected target.
1551 Returns True if symlink needs to be written, False if it matches.
1552 """
1553 try:
1554 current_target = os.readlink(full_path)
1555 blob_obj = repo_object_store[entry_sha]
1556 expected_target = blob_obj.as_raw_string()
1557 if isinstance(current_target, str):
1558 current_target = current_target.encode()
1559 return current_target != expected_target
1560 except FileNotFoundError:
1561 # Symlink doesn't exist
1562 return True
1563 except OSError as e:
1564 if e.errno == errno.EINVAL:
1565 # Not a symlink
1566 return True
1567 raise
1570def _check_file_matches(
1571 repo_object_store,
1572 full_path: bytes,
1573 entry_sha: bytes,
1574 entry_mode: int,
1575 current_stat: os.stat_result,
1576 honor_filemode: bool,
1577 blob_normalizer: Optional["BlobNormalizer"] = None,
1578 tree_path: Optional[bytes] = None,
1579) -> bool:
1580 """Check if a file on disk matches the expected git object.
1582 Returns True if file needs to be written, False if it matches.
1583 """
1584 # Check mode first (if honor_filemode is True)
1585 if honor_filemode:
1586 current_mode = stat.S_IMODE(current_stat.st_mode)
1587 expected_mode = stat.S_IMODE(entry_mode)
1588 if current_mode != expected_mode:
1589 return True
1591 # If mode matches (or we don't care), check content via size first
1592 blob_obj = repo_object_store[entry_sha]
1593 if current_stat.st_size != blob_obj.raw_length():
1594 return True
1596 # Size matches, check actual content
1597 try:
1598 with open(full_path, "rb") as f:
1599 current_content = f.read()
1600 expected_content = blob_obj.as_raw_string()
1601 if blob_normalizer and tree_path is not None:
1602 normalized_blob = blob_normalizer.checkout_normalize(
1603 blob_obj, tree_path
1604 )
1605 expected_content = normalized_blob.as_raw_string()
1606 return current_content != expected_content
1607 except (FileNotFoundError, PermissionError, IsADirectoryError):
1608 return True
1611def _transition_to_submodule(repo, path, full_path, current_stat, entry, index):
1612 """Transition any type to submodule."""
1613 from .submodule import ensure_submodule_placeholder
1615 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1616 # Already a directory, just ensure .git file exists
1617 ensure_submodule_placeholder(repo, path)
1618 else:
1619 # Remove whatever is there and create submodule
1620 if current_stat is not None:
1621 _remove_file_with_readonly_handling(full_path)
1622 ensure_submodule_placeholder(repo, path)
1624 st = os.lstat(full_path)
1625 index[path] = index_entry_from_stat(st, entry.sha)
1628def _transition_to_file(
1629 object_store,
1630 path,
1631 full_path,
1632 current_stat,
1633 entry,
1634 index,
1635 honor_filemode,
1636 symlink_fn,
1637 blob_normalizer,
1638):
1639 """Transition any type to regular file or symlink."""
1640 # Check if we need to update
1641 if (
1642 current_stat is not None
1643 and stat.S_ISREG(current_stat.st_mode)
1644 and not stat.S_ISLNK(entry.mode)
1645 ):
1646 # File to file - check if update needed
1647 needs_update = _check_file_matches(
1648 object_store,
1649 full_path,
1650 entry.sha,
1651 entry.mode,
1652 current_stat,
1653 honor_filemode,
1654 blob_normalizer,
1655 path,
1656 )
1657 elif (
1658 current_stat is not None
1659 and stat.S_ISLNK(current_stat.st_mode)
1660 and stat.S_ISLNK(entry.mode)
1661 ):
1662 # Symlink to symlink - check if update needed
1663 needs_update = _check_symlink_matches(full_path, object_store, entry.sha)
1664 else:
1665 needs_update = True
1667 if not needs_update:
1668 # Just update index - current_stat should always be valid here since we're not updating
1669 index[path] = index_entry_from_stat(current_stat, entry.sha)
1670 return
1672 # Remove existing entry if needed
1673 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1674 # Remove directory
1675 dir_contents = set(os.listdir(full_path))
1676 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
1678 if git_file_name in dir_contents:
1679 if dir_contents != {git_file_name}:
1680 raise IsADirectoryError(
1681 f"Cannot replace submodule with untracked files: {full_path!r}"
1682 )
1683 shutil.rmtree(full_path)
1684 else:
1685 try:
1686 os.rmdir(full_path)
1687 except OSError as e:
1688 if e.errno == errno.ENOTEMPTY:
1689 raise IsADirectoryError(
1690 f"Cannot replace non-empty directory with file: {full_path!r}"
1691 )
1692 raise
1693 elif current_stat is not None:
1694 _remove_file_with_readonly_handling(full_path)
1696 # Ensure parent directory exists
1697 _ensure_parent_dir_exists(full_path)
1699 # Write the file
1700 blob_obj = object_store[entry.sha]
1701 assert isinstance(blob_obj, Blob)
1702 if blob_normalizer:
1703 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
1704 st = build_file_from_blob(
1705 blob_obj,
1706 entry.mode,
1707 full_path,
1708 honor_filemode=honor_filemode,
1709 symlink_fn=symlink_fn,
1710 )
1711 index[path] = index_entry_from_stat(st, entry.sha)
1714def _transition_to_absent(repo, path, full_path, current_stat, index):
1715 """Remove any type of entry."""
1716 if current_stat is None:
1717 return
1719 if stat.S_ISDIR(current_stat.st_mode):
1720 # Check if it's a submodule directory
1721 dir_contents = set(os.listdir(full_path))
1722 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
1724 if git_file_name in dir_contents and dir_contents == {git_file_name}:
1725 shutil.rmtree(full_path)
1726 else:
1727 try:
1728 os.rmdir(full_path)
1729 except OSError as e:
1730 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
1731 raise
1732 else:
1733 _remove_file_with_readonly_handling(full_path)
1735 try:
1736 del index[path]
1737 except KeyError:
1738 pass
1740 # Try to remove empty parent directories
1741 _remove_empty_parents(
1742 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
1743 )
1746def update_working_tree(
1747 repo: "Repo",
1748 old_tree_id: Optional[bytes],
1749 new_tree_id: bytes,
1750 honor_filemode: bool = True,
1751 validate_path_element: Optional[Callable[[bytes], bool]] = None,
1752 symlink_fn: Optional[Callable] = None,
1753 force_remove_untracked: bool = False,
1754 blob_normalizer: Optional["BlobNormalizer"] = None,
1755) -> None:
1756 """Update the working tree and index to match a new tree.
1758 This function handles:
1759 - Adding new files
1760 - Updating modified files
1761 - Removing deleted files
1762 - Cleaning up empty directories
1764 Args:
1765 repo: Repository object
1766 old_tree_id: SHA of the tree before the update
1767 new_tree_id: SHA of the tree to update to
1768 honor_filemode: An optional flag to honor core.filemode setting
1769 validate_path_element: Function to validate path elements to check out
1770 symlink_fn: Function to use for creating symlinks
1771 force_remove_untracked: If True, remove files that exist in working
1772 directory but not in target tree, even if old_tree_id is None
1773 blob_normalizer: An optional BlobNormalizer to use for converting line
1774 endings when writing blobs to the working directory.
1775 """
1776 if validate_path_element is None:
1777 validate_path_element = validate_path_element_default
1779 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
1780 index = repo.open_index()
1782 # Build sets of paths for efficient lookup
1783 new_paths = {}
1784 for entry in iter_tree_contents(repo.object_store, new_tree_id):
1785 if entry.path.startswith(b".git") or not validate_path(
1786 entry.path, validate_path_element
1787 ):
1788 continue
1789 new_paths[entry.path] = entry
1791 old_paths = {}
1792 if old_tree_id:
1793 for entry in iter_tree_contents(repo.object_store, old_tree_id):
1794 if not entry.path.startswith(b".git"):
1795 old_paths[entry.path] = entry
1797 # Process all paths
1798 all_paths = set(new_paths.keys()) | set(old_paths.keys())
1800 # Check for paths that need to become directories
1801 paths_needing_dir = set()
1802 for path in new_paths:
1803 parts = path.split(b"/")
1804 for i in range(1, len(parts)):
1805 parent = b"/".join(parts[:i])
1806 if parent in old_paths and parent not in new_paths:
1807 paths_needing_dir.add(parent)
1809 # Check if any path that needs to become a directory has been modified
1810 current_stat: Optional[os.stat_result]
1811 stat_cache: dict[bytes, Optional[os.stat_result]] = {}
1812 for path in paths_needing_dir:
1813 full_path = _tree_to_fs_path(repo_path, path)
1814 try:
1815 current_stat = os.lstat(full_path)
1816 except FileNotFoundError:
1817 # File doesn't exist, proceed
1818 stat_cache[full_path] = None
1819 except PermissionError:
1820 # Can't read file, proceed
1821 pass
1822 else:
1823 stat_cache[full_path] = current_stat
1824 if stat.S_ISREG(current_stat.st_mode):
1825 # Check if file has been modified
1826 old_entry = old_paths[path]
1827 if _check_file_matches(
1828 repo.object_store,
1829 full_path,
1830 old_entry.sha,
1831 old_entry.mode,
1832 current_stat,
1833 honor_filemode,
1834 blob_normalizer,
1835 path,
1836 ):
1837 # File has been modified, can't replace with directory
1838 raise OSError(
1839 f"Cannot replace modified file with directory: {path!r}"
1840 )
1842 # Process in two passes: deletions first, then additions/updates
1843 # This handles case-only renames on case-insensitive filesystems correctly
1844 paths_to_remove = []
1845 paths_to_update = []
1847 for path in sorted(all_paths):
1848 if path in new_paths:
1849 paths_to_update.append(path)
1850 else:
1851 paths_to_remove.append(path)
1853 # First process removals
1854 for path in paths_to_remove:
1855 full_path = _tree_to_fs_path(repo_path, path)
1857 # Determine current state - use cache if available
1858 try:
1859 current_stat = stat_cache[full_path]
1860 except KeyError:
1861 try:
1862 current_stat = os.lstat(full_path)
1863 except FileNotFoundError:
1864 current_stat = None
1866 _transition_to_absent(repo, path, full_path, current_stat, index)
1868 # Then process additions/updates
1869 for path in paths_to_update:
1870 full_path = _tree_to_fs_path(repo_path, path)
1872 # Determine current state - use cache if available
1873 try:
1874 current_stat = stat_cache[full_path]
1875 except KeyError:
1876 try:
1877 current_stat = os.lstat(full_path)
1878 except FileNotFoundError:
1879 current_stat = None
1881 new_entry = new_paths[path]
1883 # Path should exist
1884 if S_ISGITLINK(new_entry.mode):
1885 _transition_to_submodule(
1886 repo, path, full_path, current_stat, new_entry, index
1887 )
1888 else:
1889 _transition_to_file(
1890 repo.object_store,
1891 path,
1892 full_path,
1893 current_stat,
1894 new_entry,
1895 index,
1896 honor_filemode,
1897 symlink_fn,
1898 blob_normalizer,
1899 )
1901 # Handle force_remove_untracked
1902 if force_remove_untracked:
1903 for root, dirs, files in os.walk(repo_path):
1904 if b".git" in os.fsencode(root):
1905 continue
1906 root_bytes = os.fsencode(root)
1907 for file in files:
1908 full_path = os.path.join(root_bytes, os.fsencode(file))
1909 tree_path = os.path.relpath(full_path, repo_path)
1910 if os.sep != "/":
1911 tree_path = tree_path.replace(os.sep.encode(), b"/")
1913 if tree_path not in new_paths:
1914 _remove_file_with_readonly_handling(full_path)
1915 if tree_path in index:
1916 del index[tree_path]
1918 # Clean up empty directories
1919 for root, dirs, files in os.walk(repo_path, topdown=False):
1920 root_bytes = os.fsencode(root)
1921 if (
1922 b".git" not in root_bytes
1923 and root_bytes != repo_path
1924 and not files
1925 and not dirs
1926 ):
1927 try:
1928 os.rmdir(root)
1929 except OSError:
1930 pass
1932 index.write()
1935def get_unstaged_changes(
1936 index: Index,
1937 root_path: Union[str, bytes],
1938 filter_blob_callback: Optional[Callable] = None,
1939) -> Generator[bytes, None, None]:
1940 """Walk through an index and check for differences against working tree.
1942 Args:
1943 index: index to check
1944 root_path: path in which to find files
1945 Returns: iterator over paths with unstaged changes
1946 """
1947 # For each entry in the index check the sha1 & ensure not staged
1948 if not isinstance(root_path, bytes):
1949 root_path = os.fsencode(root_path)
1951 for tree_path, entry in index.iteritems():
1952 full_path = _tree_to_fs_path(root_path, tree_path)
1953 if isinstance(entry, ConflictedIndexEntry):
1954 # Conflicted files are always unstaged
1955 yield tree_path
1956 continue
1958 try:
1959 st = os.lstat(full_path)
1960 if stat.S_ISDIR(st.st_mode):
1961 if _has_directory_changed(tree_path, entry):
1962 yield tree_path
1963 continue
1965 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
1966 continue
1968 blob = blob_from_path_and_stat(full_path, st)
1970 if filter_blob_callback is not None:
1971 blob = filter_blob_callback(blob, tree_path)
1972 except FileNotFoundError:
1973 # The file was removed, so we assume that counts as
1974 # different from whatever file used to exist.
1975 yield tree_path
1976 else:
1977 if blob.id != entry.sha:
1978 yield tree_path
1981def _tree_to_fs_path(root_path: bytes, tree_path: bytes) -> bytes:
1982 """Convert a git tree path to a file system path.
1984 Args:
1985 root_path: Root filesystem path
1986 tree_path: Git tree path as bytes
1988 Returns: File system path.
1989 """
1990 assert isinstance(tree_path, bytes)
1991 if os_sep_bytes != b"/":
1992 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
1993 else:
1994 sep_corrected_path = tree_path
1995 return os.path.join(root_path, sep_corrected_path)
1998def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes:
1999 """Convert a file system path to a git tree path.
2001 Args:
2002 fs_path: File system path.
2004 Returns: Git tree path as bytes
2005 """
2006 if not isinstance(fs_path, bytes):
2007 fs_path_bytes = os.fsencode(fs_path)
2008 else:
2009 fs_path_bytes = fs_path
2010 if os_sep_bytes != b"/":
2011 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2012 else:
2013 tree_path = fs_path_bytes
2014 return tree_path
2017def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
2018 if os.path.exists(os.path.join(path, b".git")):
2019 head = read_submodule_head(path)
2020 if head is None:
2021 return None
2022 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2023 return None
2026def index_entry_from_path(
2027 path: bytes, object_store: Optional[ObjectContainer] = None
2028) -> Optional[IndexEntry]:
2029 """Create an index from a filesystem path.
2031 This returns an index value for files, symlinks
2032 and tree references. for directories and
2033 non-existent files it returns None
2035 Args:
2036 path: Path to create an index entry for
2037 object_store: Optional object store to
2038 save new blobs in
2039 Returns: An index entry; None for directories
2040 """
2041 assert isinstance(path, bytes)
2042 st = os.lstat(path)
2043 if stat.S_ISDIR(st.st_mode):
2044 return index_entry_from_directory(st, path)
2046 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
2047 blob = blob_from_path_and_stat(path, st)
2048 if object_store is not None:
2049 object_store.add_object(blob)
2050 return index_entry_from_stat(st, blob.id)
2052 return None
2055def iter_fresh_entries(
2056 paths: Iterable[bytes],
2057 root_path: bytes,
2058 object_store: Optional[ObjectContainer] = None,
2059) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
2060 """Iterate over current versions of index entries on disk.
2062 Args:
2063 paths: Paths to iterate over
2064 root_path: Root path to access from
2065 object_store: Optional store to save new blobs in
2066 Returns: Iterator over path, index_entry
2067 """
2068 for path in paths:
2069 p = _tree_to_fs_path(root_path, path)
2070 try:
2071 entry = index_entry_from_path(p, object_store=object_store)
2072 except (FileNotFoundError, IsADirectoryError):
2073 entry = None
2074 yield path, entry
2077def iter_fresh_objects(
2078 paths: Iterable[bytes],
2079 root_path: bytes,
2080 include_deleted: bool = False,
2081 object_store: Optional[ObjectContainer] = None,
2082) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
2083 """Iterate over versions of objects on disk referenced by index.
2085 Args:
2086 root_path: Root path to access from
2087 include_deleted: Include deleted entries with sha and
2088 mode set to None
2089 object_store: Optional object store to report new items to
2090 Returns: Iterator over path, sha, mode
2091 """
2092 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
2093 if entry is None:
2094 if include_deleted:
2095 yield path, None, None
2096 else:
2097 yield path, entry.sha, cleanup_mode(entry.mode)
2100def refresh_index(index: Index, root_path: bytes) -> None:
2101 """Refresh the contents of an index.
2103 This is the equivalent to running 'git commit -a'.
2105 Args:
2106 index: Index to update
2107 root_path: Root filesystem path
2108 """
2109 for path, entry in iter_fresh_entries(index, root_path):
2110 if entry:
2111 index[path] = entry
2114class locked_index:
2115 """Lock the index while making modifications.
2117 Works as a context manager.
2118 """
2120 _file: "_GitFile"
2122 def __init__(self, path: Union[bytes, str]) -> None:
2123 self._path = path
2125 def __enter__(self) -> Index:
2126 self._file = GitFile(self._path, "wb")
2127 self._index = Index(self._path)
2128 return self._index
2130 def __exit__(
2131 self,
2132 exc_type: Optional[type],
2133 exc_value: Optional[BaseException],
2134 traceback: Optional[types.TracebackType],
2135 ) -> None:
2136 if exc_type is not None:
2137 self._file.abort()
2138 return
2139 try:
2140 from typing import BinaryIO, cast
2142 f = SHA1Writer(cast(BinaryIO, self._file))
2143 write_index_dict(cast(BinaryIO, f), self._index._byname)
2144 except BaseException:
2145 self._file.abort()
2146 else:
2147 f.close()