Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import Generator, Iterable, Iterator
32from dataclasses import dataclass
33from enum import Enum
34from typing import (
35 TYPE_CHECKING,
36 Any,
37 BinaryIO,
38 Callable,
39 Optional,
40 Union,
41 cast,
42)
44if TYPE_CHECKING:
45 from .config import Config
46 from .diff_tree import TreeChange
47 from .file import _GitFile
48 from .line_ending import BlobNormalizer
49 from .object_store import BaseObjectStore
50 from .repo import Repo
52from .file import GitFile
53from .object_store import iter_tree_contents
54from .objects import (
55 S_IFGITLINK,
56 S_ISGITLINK,
57 Blob,
58 ObjectID,
59 Tree,
60 hex_to_sha,
61 sha_to_hex,
62)
63from .pack import ObjectContainer, SHA1Reader, SHA1Writer
65# 2-bit stage (during merge)
66FLAG_STAGEMASK = 0x3000
67FLAG_STAGESHIFT = 12
68FLAG_NAMEMASK = 0x0FFF
70# assume-valid
71FLAG_VALID = 0x8000
73# extended flag (must be zero in version 2)
74FLAG_EXTENDED = 0x4000
76# used by sparse checkout
77EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
79# used by "git add -N"
80EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
82DEFAULT_VERSION = 2
84# Index extension signatures
85TREE_EXTENSION = b"TREE"
86REUC_EXTENSION = b"REUC"
87UNTR_EXTENSION = b"UNTR"
88EOIE_EXTENSION = b"EOIE"
89IEOT_EXTENSION = b"IEOT"
92def _encode_varint(value: int) -> bytes:
93 """Encode an integer using variable-width encoding.
95 Same format as used for OFS_DELTA pack entries and index v4 path compression.
96 Uses 7 bits per byte, with the high bit indicating continuation.
98 Args:
99 value: Integer to encode
100 Returns:
101 Encoded bytes
102 """
103 if value == 0:
104 return b"\x00"
106 result = []
107 while value > 0:
108 byte = value & 0x7F # Take lower 7 bits
109 value >>= 7
110 if value > 0:
111 byte |= 0x80 # Set continuation bit
112 result.append(byte)
114 return bytes(result)
117def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
118 """Decode a variable-width encoded integer.
120 Args:
121 data: Bytes to decode from
122 offset: Starting offset in data
123 Returns:
124 tuple of (decoded_value, new_offset)
125 """
126 value = 0
127 shift = 0
128 pos = offset
130 while pos < len(data):
131 byte = data[pos]
132 pos += 1
133 value |= (byte & 0x7F) << shift
134 shift += 7
135 if not (byte & 0x80): # No continuation bit
136 break
138 return value, pos
141def _compress_path(path: bytes, previous_path: bytes) -> bytes:
142 """Compress a path relative to the previous path for index version 4.
144 Args:
145 path: Path to compress
146 previous_path: Previous path for comparison
147 Returns:
148 Compressed path data (varint prefix_len + suffix)
149 """
150 # Find the common prefix length
151 common_len = 0
152 min_len = min(len(path), len(previous_path))
154 for i in range(min_len):
155 if path[i] == previous_path[i]:
156 common_len += 1
157 else:
158 break
160 # The number of bytes to remove from the end of previous_path
161 # to get the common prefix
162 remove_len = len(previous_path) - common_len
164 # The suffix to append
165 suffix = path[common_len:]
167 # Encode: varint(remove_len) + suffix + NUL
168 return _encode_varint(remove_len) + suffix + b"\x00"
171def _decompress_path(
172 data: bytes, offset: int, previous_path: bytes
173) -> tuple[bytes, int]:
174 """Decompress a path from index version 4 compressed format.
176 Args:
177 data: Raw data containing compressed path
178 offset: Starting offset in data
179 previous_path: Previous path for decompression
180 Returns:
181 tuple of (decompressed_path, new_offset)
182 """
183 # Decode the number of bytes to remove from previous path
184 remove_len, new_offset = _decode_varint(data, offset)
186 # Find the NUL terminator for the suffix
187 suffix_start = new_offset
188 suffix_end = suffix_start
189 while suffix_end < len(data) and data[suffix_end] != 0:
190 suffix_end += 1
192 if suffix_end >= len(data):
193 raise ValueError("Unterminated path suffix in compressed entry")
195 suffix = data[suffix_start:suffix_end]
196 new_offset = suffix_end + 1 # Skip the NUL terminator
198 # Reconstruct the path
199 if remove_len > len(previous_path):
200 raise ValueError(
201 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
202 )
204 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
205 path = prefix + suffix
207 return path, new_offset
210def _decompress_path_from_stream(
211 f: BinaryIO, previous_path: bytes
212) -> tuple[bytes, int]:
213 """Decompress a path from index version 4 compressed format, reading from stream.
215 Args:
216 f: File-like object to read from
217 previous_path: Previous path for decompression
218 Returns:
219 tuple of (decompressed_path, bytes_consumed)
220 """
221 # Decode the varint for remove_len by reading byte by byte
222 remove_len = 0
223 shift = 0
224 bytes_consumed = 0
226 while True:
227 byte_data = f.read(1)
228 if not byte_data:
229 raise ValueError("Unexpected end of file while reading varint")
230 byte = byte_data[0]
231 bytes_consumed += 1
232 remove_len |= (byte & 0x7F) << shift
233 shift += 7
234 if not (byte & 0x80): # No continuation bit
235 break
237 # Read the suffix until NUL terminator
238 suffix = b""
239 while True:
240 byte_data = f.read(1)
241 if not byte_data:
242 raise ValueError("Unexpected end of file while reading path suffix")
243 byte = byte_data[0]
244 bytes_consumed += 1
245 if byte == 0: # NUL terminator
246 break
247 suffix += bytes([byte])
249 # Reconstruct the path
250 if remove_len > len(previous_path):
251 raise ValueError(
252 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
253 )
255 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
256 path = prefix + suffix
258 return path, bytes_consumed
261class Stage(Enum):
262 NORMAL = 0
263 MERGE_CONFLICT_ANCESTOR = 1
264 MERGE_CONFLICT_THIS = 2
265 MERGE_CONFLICT_OTHER = 3
268@dataclass
269class SerializedIndexEntry:
270 name: bytes
271 ctime: Union[int, float, tuple[int, int]]
272 mtime: Union[int, float, tuple[int, int]]
273 dev: int
274 ino: int
275 mode: int
276 uid: int
277 gid: int
278 size: int
279 sha: bytes
280 flags: int
281 extended_flags: int
283 def stage(self) -> Stage:
284 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
287@dataclass
288class IndexExtension:
289 """Base class for index extensions."""
291 signature: bytes
292 data: bytes
294 @classmethod
295 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
296 """Create an extension from raw data.
298 Args:
299 signature: 4-byte extension signature
300 data: Extension data
301 Returns:
302 Parsed extension object
303 """
304 if signature == TREE_EXTENSION:
305 return TreeExtension.from_bytes(data)
306 elif signature == REUC_EXTENSION:
307 return ResolveUndoExtension.from_bytes(data)
308 elif signature == UNTR_EXTENSION:
309 return UntrackedExtension.from_bytes(data)
310 else:
311 # Unknown extension - just store raw data
312 return cls(signature, data)
314 def to_bytes(self) -> bytes:
315 """Serialize extension to bytes."""
316 return self.data
319class TreeExtension(IndexExtension):
320 """Tree cache extension."""
322 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
323 self.entries = entries
324 super().__init__(TREE_EXTENSION, b"")
326 @classmethod
327 def from_bytes(cls, data: bytes) -> "TreeExtension":
328 # TODO: Implement tree cache parsing
329 return cls([])
331 def to_bytes(self) -> bytes:
332 # TODO: Implement tree cache serialization
333 return b""
336class ResolveUndoExtension(IndexExtension):
337 """Resolve undo extension for recording merge conflicts."""
339 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
340 self.entries = entries
341 super().__init__(REUC_EXTENSION, b"")
343 @classmethod
344 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
345 # TODO: Implement resolve undo parsing
346 return cls([])
348 def to_bytes(self) -> bytes:
349 # TODO: Implement resolve undo serialization
350 return b""
353class UntrackedExtension(IndexExtension):
354 """Untracked cache extension."""
356 def __init__(self, data: bytes) -> None:
357 super().__init__(UNTR_EXTENSION, data)
359 @classmethod
360 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
361 return cls(data)
364@dataclass
365class IndexEntry:
366 ctime: Union[int, float, tuple[int, int]]
367 mtime: Union[int, float, tuple[int, int]]
368 dev: int
369 ino: int
370 mode: int
371 uid: int
372 gid: int
373 size: int
374 sha: bytes
375 flags: int = 0
376 extended_flags: int = 0
378 @classmethod
379 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
380 return cls(
381 ctime=serialized.ctime,
382 mtime=serialized.mtime,
383 dev=serialized.dev,
384 ino=serialized.ino,
385 mode=serialized.mode,
386 uid=serialized.uid,
387 gid=serialized.gid,
388 size=serialized.size,
389 sha=serialized.sha,
390 flags=serialized.flags,
391 extended_flags=serialized.extended_flags,
392 )
394 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
395 # Clear out any existing stage bits, then set them from the Stage.
396 new_flags = self.flags & ~FLAG_STAGEMASK
397 new_flags |= stage.value << FLAG_STAGESHIFT
398 return SerializedIndexEntry(
399 name=name,
400 ctime=self.ctime,
401 mtime=self.mtime,
402 dev=self.dev,
403 ino=self.ino,
404 mode=self.mode,
405 uid=self.uid,
406 gid=self.gid,
407 size=self.size,
408 sha=self.sha,
409 flags=new_flags,
410 extended_flags=self.extended_flags,
411 )
413 def stage(self) -> Stage:
414 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
416 @property
417 def skip_worktree(self) -> bool:
418 """Return True if the skip-worktree bit is set in extended_flags."""
419 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
421 def set_skip_worktree(self, skip: bool = True) -> None:
422 """Helper method to set or clear the skip-worktree bit in extended_flags.
423 Also sets FLAG_EXTENDED in self.flags if needed.
424 """
425 if skip:
426 # Turn on the skip-worktree bit
427 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
428 # Also ensure the main 'extended' bit is set in flags
429 self.flags |= FLAG_EXTENDED
430 else:
431 # Turn off the skip-worktree bit
432 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
433 # Optionally unset the main extended bit if no extended flags remain
434 if self.extended_flags == 0:
435 self.flags &= ~FLAG_EXTENDED
438class ConflictedIndexEntry:
439 """Index entry that represents a conflict."""
441 ancestor: Optional[IndexEntry]
442 this: Optional[IndexEntry]
443 other: Optional[IndexEntry]
445 def __init__(
446 self,
447 ancestor: Optional[IndexEntry] = None,
448 this: Optional[IndexEntry] = None,
449 other: Optional[IndexEntry] = None,
450 ) -> None:
451 self.ancestor = ancestor
452 self.this = this
453 self.other = other
456class UnmergedEntries(Exception):
457 """Unmerged entries exist in the index."""
460def pathsplit(path: bytes) -> tuple[bytes, bytes]:
461 """Split a /-delimited path into a directory part and a basename.
463 Args:
464 path: The path to split.
466 Returns:
467 Tuple with directory name and basename
468 """
469 try:
470 (dirname, basename) = path.rsplit(b"/", 1)
471 except ValueError:
472 return (b"", path)
473 else:
474 return (dirname, basename)
477def pathjoin(*args: bytes) -> bytes:
478 """Join a /-delimited path."""
479 return b"/".join([p for p in args if p])
482def read_cache_time(f: BinaryIO) -> tuple[int, int]:
483 """Read a cache time.
485 Args:
486 f: File-like object to read from
487 Returns:
488 Tuple with seconds and nanoseconds
489 """
490 return struct.unpack(">LL", f.read(8))
493def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None:
494 """Write a cache time.
496 Args:
497 f: File-like object to write to
498 t: Time to write (as int, float or tuple with secs and nsecs)
499 """
500 if isinstance(t, int):
501 t = (t, 0)
502 elif isinstance(t, float):
503 (secs, nsecs) = divmod(t, 1.0)
504 t = (int(secs), int(nsecs * 1000000000))
505 elif not isinstance(t, tuple):
506 raise TypeError(t)
507 f.write(struct.pack(">LL", *t))
510def read_cache_entry(
511 f: BinaryIO, version: int, previous_path: bytes = b""
512) -> SerializedIndexEntry:
513 """Read an entry from a cache file.
515 Args:
516 f: File-like object to read from
517 version: Index version
518 previous_path: Previous entry's path (for version 4 compression)
519 """
520 beginoffset = f.tell()
521 ctime = read_cache_time(f)
522 mtime = read_cache_time(f)
523 (
524 dev,
525 ino,
526 mode,
527 uid,
528 gid,
529 size,
530 sha,
531 flags,
532 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
533 if flags & FLAG_EXTENDED:
534 if version < 3:
535 raise AssertionError("extended flag set in index with version < 3")
536 (extended_flags,) = struct.unpack(">H", f.read(2))
537 else:
538 extended_flags = 0
540 if version >= 4:
541 # Version 4: paths are always compressed (name_len should be 0)
542 name, consumed = _decompress_path_from_stream(f, previous_path)
543 else:
544 # Versions < 4: regular name reading
545 name = f.read(flags & FLAG_NAMEMASK)
547 # Padding:
548 if version < 4:
549 real_size = (f.tell() - beginoffset + 8) & ~7
550 f.read((beginoffset + real_size) - f.tell())
552 return SerializedIndexEntry(
553 name,
554 ctime,
555 mtime,
556 dev,
557 ino,
558 mode,
559 uid,
560 gid,
561 size,
562 sha_to_hex(sha),
563 flags & ~FLAG_NAMEMASK,
564 extended_flags,
565 )
568def write_cache_entry(
569 f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
570) -> None:
571 """Write an index entry to a file.
573 Args:
574 f: File object
575 entry: IndexEntry to write
576 version: Index format version
577 previous_path: Previous entry's path (for version 4 compression)
578 """
579 beginoffset = f.tell()
580 write_cache_time(f, entry.ctime)
581 write_cache_time(f, entry.mtime)
583 if version >= 4:
584 # Version 4: use compression but set name_len to actual filename length
585 # This matches how C Git implements index v4 flags
586 compressed_path = _compress_path(entry.name, previous_path)
587 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
588 else:
589 # Versions < 4: include actual name length
590 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
592 if entry.extended_flags:
593 flags |= FLAG_EXTENDED
594 if flags & FLAG_EXTENDED and version is not None and version < 3:
595 raise AssertionError("unable to use extended flags in version < 3")
597 f.write(
598 struct.pack(
599 b">LLLLLL20sH",
600 entry.dev & 0xFFFFFFFF,
601 entry.ino & 0xFFFFFFFF,
602 entry.mode,
603 entry.uid,
604 entry.gid,
605 entry.size,
606 hex_to_sha(entry.sha),
607 flags,
608 )
609 )
610 if flags & FLAG_EXTENDED:
611 f.write(struct.pack(b">H", entry.extended_flags))
613 if version >= 4:
614 # Version 4: always write compressed path
615 f.write(compressed_path)
616 else:
617 # Versions < 4: write regular path and padding
618 f.write(entry.name)
619 real_size = (f.tell() - beginoffset + 8) & ~7
620 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
623class UnsupportedIndexFormat(Exception):
624 """An unsupported index format was encountered."""
626 def __init__(self, version: int) -> None:
627 self.index_format_version = version
630def read_index_header(f: BinaryIO) -> tuple[int, int]:
631 """Read an index header from a file.
633 Returns:
634 tuple of (version, num_entries)
635 """
636 header = f.read(4)
637 if header != b"DIRC":
638 raise AssertionError(f"Invalid index file header: {header!r}")
639 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
640 if version not in (1, 2, 3, 4):
641 raise UnsupportedIndexFormat(version)
642 return version, num_entries
645def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None:
646 """Write an index extension.
648 Args:
649 f: File-like object to write to
650 extension: Extension to write
651 """
652 data = extension.to_bytes()
653 f.write(extension.signature)
654 f.write(struct.pack(">I", len(data)))
655 f.write(data)
658def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
659 """Read an index file, yielding the individual entries."""
660 version, num_entries = read_index_header(f)
661 previous_path = b""
662 for i in range(num_entries):
663 entry = read_cache_entry(f, version, previous_path)
664 previous_path = entry.name
665 yield entry
668def read_index_dict_with_version(
669 f: BinaryIO,
670) -> tuple[
671 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
672]:
673 """Read an index file and return it as a dictionary along with the version.
675 Returns:
676 tuple of (entries_dict, version, extensions)
677 """
678 version, num_entries = read_index_header(f)
680 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
681 previous_path = b""
682 for i in range(num_entries):
683 entry = read_cache_entry(f, version, previous_path)
684 previous_path = entry.name
685 stage = entry.stage()
686 if stage == Stage.NORMAL:
687 ret[entry.name] = IndexEntry.from_serialized(entry)
688 else:
689 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
690 if isinstance(existing, IndexEntry):
691 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
692 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
693 existing.ancestor = IndexEntry.from_serialized(entry)
694 elif stage == Stage.MERGE_CONFLICT_THIS:
695 existing.this = IndexEntry.from_serialized(entry)
696 elif stage == Stage.MERGE_CONFLICT_OTHER:
697 existing.other = IndexEntry.from_serialized(entry)
699 # Read extensions
700 extensions = []
701 while True:
702 # Check if we're at the end (20 bytes before EOF for SHA checksum)
703 current_pos = f.tell()
704 f.seek(0, 2) # EOF
705 eof_pos = f.tell()
706 f.seek(current_pos)
708 if current_pos >= eof_pos - 20:
709 break
711 # Try to read extension signature
712 signature = f.read(4)
713 if len(signature) < 4:
714 break
716 # Check if it's a valid extension signature (4 uppercase letters)
717 if not all(65 <= b <= 90 for b in signature):
718 # Not an extension, seek back
719 f.seek(-4, 1)
720 break
722 # Read extension size
723 size_data = f.read(4)
724 if len(size_data) < 4:
725 break
726 size = struct.unpack(">I", size_data)[0]
728 # Read extension data
729 data = f.read(size)
730 if len(data) < size:
731 break
733 extension = IndexExtension.from_raw(signature, data)
734 extensions.append(extension)
736 return ret, version, extensions
739def read_index_dict(
740 f: BinaryIO,
741) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
742 """Read an index file and return it as a dictionary.
743 Dict Key is tuple of path and stage number, as
744 path alone is not unique
745 Args:
746 f: File object to read fromls.
747 """
748 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
749 for entry in read_index(f):
750 stage = entry.stage()
751 if stage == Stage.NORMAL:
752 ret[entry.name] = IndexEntry.from_serialized(entry)
753 else:
754 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
755 if isinstance(existing, IndexEntry):
756 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
757 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
758 existing.ancestor = IndexEntry.from_serialized(entry)
759 elif stage == Stage.MERGE_CONFLICT_THIS:
760 existing.this = IndexEntry.from_serialized(entry)
761 elif stage == Stage.MERGE_CONFLICT_OTHER:
762 existing.other = IndexEntry.from_serialized(entry)
763 return ret
766def write_index(
767 f: BinaryIO,
768 entries: list[SerializedIndexEntry],
769 version: Optional[int] = None,
770 extensions: Optional[list[IndexExtension]] = None,
771) -> None:
772 """Write an index file.
774 Args:
775 f: File-like object to write to
776 version: Version number to write
777 entries: Iterable over the entries to write
778 extensions: Optional list of extensions to write
779 """
780 if version is None:
781 version = DEFAULT_VERSION
782 # STEP 1: check if any extended_flags are set
783 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
784 if uses_extended_flags and version < 3:
785 # Force or bump the version to 3
786 version = 3
787 # The rest is unchanged, but you might insert a final check:
788 if version < 3:
789 # Double-check no extended flags appear
790 for e in entries:
791 if e.extended_flags != 0:
792 raise AssertionError("Attempt to use extended flags in index < v3")
793 # Proceed with the existing code to write the header and entries.
794 f.write(b"DIRC")
795 f.write(struct.pack(b">LL", version, len(entries)))
796 previous_path = b""
797 for entry in entries:
798 write_cache_entry(f, entry, version=version, previous_path=previous_path)
799 previous_path = entry.name
801 # Write extensions
802 if extensions:
803 for extension in extensions:
804 write_index_extension(f, extension)
807def write_index_dict(
808 f: BinaryIO,
809 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
810 version: Optional[int] = None,
811 extensions: Optional[list[IndexExtension]] = None,
812) -> None:
813 """Write an index file based on the contents of a dictionary.
814 being careful to sort by path and then by stage.
815 """
816 entries_list = []
817 for key in sorted(entries):
818 value = entries[key]
819 if isinstance(value, ConflictedIndexEntry):
820 if value.ancestor is not None:
821 entries_list.append(
822 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
823 )
824 if value.this is not None:
825 entries_list.append(
826 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
827 )
828 if value.other is not None:
829 entries_list.append(
830 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
831 )
832 else:
833 entries_list.append(value.serialize(key, Stage.NORMAL))
835 write_index(f, entries_list, version=version, extensions=extensions)
838def cleanup_mode(mode: int) -> int:
839 """Cleanup a mode value.
841 This will return a mode that can be stored in a tree object.
843 Args:
844 mode: Mode to clean up.
846 Returns:
847 mode
848 """
849 if stat.S_ISLNK(mode):
850 return stat.S_IFLNK
851 elif stat.S_ISDIR(mode):
852 return stat.S_IFDIR
853 elif S_ISGITLINK(mode):
854 return S_IFGITLINK
855 ret = stat.S_IFREG | 0o644
856 if mode & 0o100:
857 ret |= 0o111
858 return ret
861class Index:
862 """A Git Index file."""
864 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
866 def __init__(
867 self,
868 filename: Union[bytes, str, os.PathLike],
869 read: bool = True,
870 skip_hash: bool = False,
871 version: Optional[int] = None,
872 ) -> None:
873 """Create an index object associated with the given filename.
875 Args:
876 filename: Path to the index file
877 read: Whether to initialize the index from the given file, should it exist.
878 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
879 version: Index format version to use (None = auto-detect from file or use default)
880 """
881 self._filename = os.fspath(filename)
882 # TODO(jelmer): Store the version returned by read_index
883 self._version = version
884 self._skip_hash = skip_hash
885 self._extensions: list[IndexExtension] = []
886 self.clear()
887 if read:
888 self.read()
890 @property
891 def path(self) -> Union[bytes, str]:
892 return self._filename
894 def __repr__(self) -> str:
895 return f"{self.__class__.__name__}({self._filename!r})"
897 def write(self) -> None:
898 """Write current contents of index to disk."""
899 from typing import BinaryIO, cast
901 f = GitFile(self._filename, "wb")
902 try:
903 # Filter out extensions with no meaningful data
904 meaningful_extensions = []
905 for ext in self._extensions:
906 # Skip extensions that have empty data
907 ext_data = ext.to_bytes()
908 if ext_data:
909 meaningful_extensions.append(ext)
911 if self._skip_hash:
912 # When skipHash is enabled, write the index without computing SHA1
913 write_index_dict(
914 cast(BinaryIO, f),
915 self._byname,
916 version=self._version,
917 extensions=meaningful_extensions,
918 )
919 # Write 20 zero bytes instead of SHA1
920 f.write(b"\x00" * 20)
921 f.close()
922 else:
923 sha1_writer = SHA1Writer(cast(BinaryIO, f))
924 write_index_dict(
925 cast(BinaryIO, sha1_writer),
926 self._byname,
927 version=self._version,
928 extensions=meaningful_extensions,
929 )
930 sha1_writer.close()
931 except:
932 f.close()
933 raise
935 def read(self) -> None:
936 """Read current contents of index from disk."""
937 if not os.path.exists(self._filename):
938 return
939 f = GitFile(self._filename, "rb")
940 try:
941 sha1_reader = SHA1Reader(f)
942 entries, version, extensions = read_index_dict_with_version(
943 cast(BinaryIO, sha1_reader)
944 )
945 self._version = version
946 self._extensions = extensions
947 self.update(entries)
948 # Extensions have already been read by read_index_dict_with_version
949 sha1_reader.check_sha(allow_empty=True)
950 finally:
951 f.close()
953 def __len__(self) -> int:
954 """Number of entries in this index file."""
955 return len(self._byname)
957 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
958 """Retrieve entry by relative path and stage.
960 Returns: Either a IndexEntry or a ConflictedIndexEntry
961 Raises KeyError: if the entry does not exist
962 """
963 return self._byname[key]
965 def __iter__(self) -> Iterator[bytes]:
966 """Iterate over the paths and stages in this index."""
967 return iter(self._byname)
969 def __contains__(self, key: bytes) -> bool:
970 return key in self._byname
972 def get_sha1(self, path: bytes) -> bytes:
973 """Return the (git object) SHA1 for the object at a path."""
974 value = self[path]
975 if isinstance(value, ConflictedIndexEntry):
976 raise UnmergedEntries
977 return value.sha
979 def get_mode(self, path: bytes) -> int:
980 """Return the POSIX file mode for the object at a path."""
981 value = self[path]
982 if isinstance(value, ConflictedIndexEntry):
983 raise UnmergedEntries
984 return value.mode
986 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
987 """Iterate over path, sha, mode tuples for use with commit_tree."""
988 for path in self:
989 entry = self[path]
990 if isinstance(entry, ConflictedIndexEntry):
991 raise UnmergedEntries
992 yield path, entry.sha, cleanup_mode(entry.mode)
994 def has_conflicts(self) -> bool:
995 for value in self._byname.values():
996 if isinstance(value, ConflictedIndexEntry):
997 return True
998 return False
1000 def clear(self) -> None:
1001 """Remove all contents from this index."""
1002 self._byname = {}
1004 def __setitem__(
1005 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
1006 ) -> None:
1007 assert isinstance(name, bytes)
1008 self._byname[name] = value
1010 def __delitem__(self, name: bytes) -> None:
1011 del self._byname[name]
1013 def iteritems(
1014 self,
1015 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1016 return iter(self._byname.items())
1018 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1019 return iter(self._byname.items())
1021 def update(
1022 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
1023 ) -> None:
1024 for key, value in entries.items():
1025 self[key] = value
1027 def paths(self) -> Generator[bytes, None, None]:
1028 yield from self._byname.keys()
1030 def changes_from_tree(
1031 self,
1032 object_store: ObjectContainer,
1033 tree: ObjectID,
1034 want_unchanged: bool = False,
1035 ) -> Generator[
1036 tuple[
1037 tuple[Optional[bytes], Optional[bytes]],
1038 tuple[Optional[int], Optional[int]],
1039 tuple[Optional[bytes], Optional[bytes]],
1040 ],
1041 None,
1042 None,
1043 ]:
1044 """Find the differences between the contents of this index and a tree.
1046 Args:
1047 object_store: Object store to use for retrieving tree contents
1048 tree: SHA1 of the root tree
1049 want_unchanged: Whether unchanged files should be reported
1050 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1051 newmode), (oldsha, newsha)
1052 """
1054 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1055 entry = self[path]
1056 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1057 return entry.sha, cleanup_mode(entry.mode)
1058 else:
1059 # Handle ConflictedIndexEntry case
1060 return b"", 0
1062 yield from changes_from_tree(
1063 self.paths(),
1064 lookup_entry,
1065 object_store,
1066 tree,
1067 want_unchanged=want_unchanged,
1068 )
1070 def commit(self, object_store: ObjectContainer) -> bytes:
1071 """Create a new tree from an index.
1073 Args:
1074 object_store: Object store to save the tree in
1075 Returns:
1076 Root tree SHA
1077 """
1078 return commit_tree(object_store, self.iterobjects())
1081def commit_tree(
1082 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1083) -> bytes:
1084 """Commit a new tree.
1086 Args:
1087 object_store: Object store to add trees to
1088 blobs: Iterable over blob path, sha, mode entries
1089 Returns:
1090 SHA1 of the created tree.
1091 """
1092 trees: dict[bytes, Any] = {b"": {}}
1094 def add_tree(path: bytes) -> dict[bytes, Any]:
1095 if path in trees:
1096 return trees[path]
1097 dirname, basename = pathsplit(path)
1098 t = add_tree(dirname)
1099 assert isinstance(basename, bytes)
1100 newtree: dict[bytes, Any] = {}
1101 t[basename] = newtree
1102 trees[path] = newtree
1103 return newtree
1105 for path, sha, mode in blobs:
1106 tree_path, basename = pathsplit(path)
1107 tree = add_tree(tree_path)
1108 tree[basename] = (mode, sha)
1110 def build_tree(path: bytes) -> bytes:
1111 tree = Tree()
1112 for basename, entry in trees[path].items():
1113 if isinstance(entry, dict):
1114 mode = stat.S_IFDIR
1115 sha = build_tree(pathjoin(path, basename))
1116 else:
1117 (mode, sha) = entry
1118 tree.add(basename, mode, sha)
1119 object_store.add_object(tree)
1120 return tree.id
1122 return build_tree(b"")
1125def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1126 """Create a new tree from an index.
1128 Args:
1129 object_store: Object store to save the tree in
1130 index: Index file
1131 Note: This function is deprecated, use index.commit() instead.
1132 Returns: Root tree sha.
1133 """
1134 return commit_tree(object_store, index.iterobjects())
1137def changes_from_tree(
1138 names: Iterable[bytes],
1139 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1140 object_store: ObjectContainer,
1141 tree: Optional[bytes],
1142 want_unchanged: bool = False,
1143) -> Iterable[
1144 tuple[
1145 tuple[Optional[bytes], Optional[bytes]],
1146 tuple[Optional[int], Optional[int]],
1147 tuple[Optional[bytes], Optional[bytes]],
1148 ]
1149]:
1150 """Find the differences between the contents of a tree and
1151 a working copy.
1153 Args:
1154 names: Iterable of names in the working copy
1155 lookup_entry: Function to lookup an entry in the working copy
1156 object_store: Object store to use for retrieving tree contents
1157 tree: SHA1 of the root tree, or None for an empty tree
1158 want_unchanged: Whether unchanged files should be reported
1159 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1160 (oldsha, newsha)
1161 """
1162 # TODO(jelmer): Support a include_trees option
1163 other_names = set(names)
1165 if tree is not None:
1166 for name, mode, sha in iter_tree_contents(object_store, tree):
1167 try:
1168 (other_sha, other_mode) = lookup_entry(name)
1169 except KeyError:
1170 # Was removed
1171 yield ((name, None), (mode, None), (sha, None))
1172 else:
1173 other_names.remove(name)
1174 if want_unchanged or other_sha != sha or other_mode != mode:
1175 yield ((name, name), (mode, other_mode), (sha, other_sha))
1177 # Mention added files
1178 for name in other_names:
1179 try:
1180 (other_sha, other_mode) = lookup_entry(name)
1181 except KeyError:
1182 pass
1183 else:
1184 yield ((None, name), (None, other_mode), (None, other_sha))
1187def index_entry_from_stat(
1188 stat_val: os.stat_result,
1189 hex_sha: bytes,
1190 mode: Optional[int] = None,
1191) -> IndexEntry:
1192 """Create a new index entry from a stat value.
1194 Args:
1195 stat_val: POSIX stat_result instance
1196 hex_sha: Hex sha of the object
1197 """
1198 if mode is None:
1199 mode = cleanup_mode(stat_val.st_mode)
1201 return IndexEntry(
1202 ctime=stat_val.st_ctime,
1203 mtime=stat_val.st_mtime,
1204 dev=stat_val.st_dev,
1205 ino=stat_val.st_ino,
1206 mode=mode,
1207 uid=stat_val.st_uid,
1208 gid=stat_val.st_gid,
1209 size=stat_val.st_size,
1210 sha=hex_sha,
1211 flags=0,
1212 extended_flags=0,
1213 )
1216if sys.platform == "win32":
1217 # On Windows, creating symlinks either requires administrator privileges
1218 # or developer mode. Raise a more helpful error when we're unable to
1219 # create symlinks
1221 # https://github.com/jelmer/dulwich/issues/1005
1223 class WindowsSymlinkPermissionError(PermissionError):
1224 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
1225 super(PermissionError, self).__init__(
1226 errno,
1227 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1228 filename,
1229 )
1231 def symlink(
1232 src: Union[str, bytes],
1233 dst: Union[str, bytes],
1234 target_is_directory: bool = False,
1235 *,
1236 dir_fd: Optional[int] = None,
1237 ) -> None:
1238 try:
1239 return os.symlink(
1240 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1241 )
1242 except PermissionError as e:
1243 raise WindowsSymlinkPermissionError(
1244 e.errno or 0, e.strerror or "", e.filename
1245 ) from e
1246else:
1247 symlink = os.symlink
1250def build_file_from_blob(
1251 blob: Blob,
1252 mode: int,
1253 target_path: bytes,
1254 *,
1255 honor_filemode: bool = True,
1256 tree_encoding: str = "utf-8",
1257 symlink_fn: Optional[Callable] = None,
1258) -> os.stat_result:
1259 """Build a file or symlink on disk based on a Git object.
1261 Args:
1262 blob: The git object
1263 mode: File mode
1264 target_path: Path to write to
1265 honor_filemode: An optional flag to honor core.filemode setting in
1266 config file, default is core.filemode=True, change executable bit
1267 symlink_fn: Function to use for creating symlinks
1268 Returns: stat object for the file
1269 """
1270 try:
1271 oldstat = os.lstat(target_path)
1272 except FileNotFoundError:
1273 oldstat = None
1274 contents = blob.as_raw_string()
1275 if stat.S_ISLNK(mode):
1276 if oldstat:
1277 _remove_file_with_readonly_handling(target_path)
1278 if sys.platform == "win32":
1279 # os.readlink on Python3 on Windows requires a unicode string.
1280 contents_str = contents.decode(tree_encoding)
1281 target_path_str = target_path.decode(tree_encoding)
1282 (symlink_fn or symlink)(contents_str, target_path_str)
1283 else:
1284 (symlink_fn or symlink)(contents, target_path)
1285 else:
1286 if oldstat is not None and oldstat.st_size == len(contents):
1287 with open(target_path, "rb") as f:
1288 if f.read() == contents:
1289 return oldstat
1291 with open(target_path, "wb") as f:
1292 # Write out file
1293 f.write(contents)
1295 if honor_filemode:
1296 os.chmod(target_path, mode)
1298 return os.lstat(target_path)
1301INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1304def _normalize_path_element_default(element: bytes) -> bytes:
1305 """Normalize path element for default case-insensitive comparison."""
1306 return element.lower()
1309def _normalize_path_element_ntfs(element: bytes) -> bytes:
1310 """Normalize path element for NTFS filesystem."""
1311 return element.rstrip(b". ").lower()
1314def _normalize_path_element_hfs(element: bytes) -> bytes:
1315 """Normalize path element for HFS+ filesystem."""
1316 import unicodedata
1318 # Decode to Unicode (let UnicodeDecodeError bubble up)
1319 element_str = element.decode("utf-8", errors="strict")
1321 # Remove HFS+ ignorable characters
1322 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1323 # Normalize to NFD
1324 normalized = unicodedata.normalize("NFD", filtered)
1325 return normalized.lower().encode("utf-8", errors="strict")
1328def get_path_element_normalizer(config: "Config") -> Callable[[bytes], bytes]:
1329 """Get the appropriate path element normalization function based on config.
1331 Args:
1332 config: Repository configuration object
1334 Returns:
1335 Function that normalizes path elements for the configured filesystem
1336 """
1337 import os
1338 import sys
1340 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1341 return _normalize_path_element_ntfs
1342 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1343 return _normalize_path_element_hfs
1344 else:
1345 return _normalize_path_element_default
1348def validate_path_element_default(element: bytes) -> bool:
1349 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1352def validate_path_element_ntfs(element: bytes) -> bool:
1353 normalized = _normalize_path_element_ntfs(element)
1354 if normalized in INVALID_DOTNAMES:
1355 return False
1356 if normalized == b"git~1":
1357 return False
1358 return True
1361# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1362HFS_IGNORABLE_CHARS = {
1363 0x200C, # ZERO WIDTH NON-JOINER
1364 0x200D, # ZERO WIDTH JOINER
1365 0x200E, # LEFT-TO-RIGHT MARK
1366 0x200F, # RIGHT-TO-LEFT MARK
1367 0x202A, # LEFT-TO-RIGHT EMBEDDING
1368 0x202B, # RIGHT-TO-LEFT EMBEDDING
1369 0x202C, # POP DIRECTIONAL FORMATTING
1370 0x202D, # LEFT-TO-RIGHT OVERRIDE
1371 0x202E, # RIGHT-TO-LEFT OVERRIDE
1372 0x206A, # INHIBIT SYMMETRIC SWAPPING
1373 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1374 0x206C, # INHIBIT ARABIC FORM SHAPING
1375 0x206D, # ACTIVATE ARABIC FORM SHAPING
1376 0x206E, # NATIONAL DIGIT SHAPES
1377 0x206F, # NOMINAL DIGIT SHAPES
1378 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1379}
1382def validate_path_element_hfs(element: bytes) -> bool:
1383 """Validate path element for HFS+ filesystem.
1385 Equivalent to Git's is_hfs_dotgit and related checks.
1386 Uses NFD normalization and ignores HFS+ ignorable characters.
1387 """
1388 try:
1389 normalized = _normalize_path_element_hfs(element)
1390 except UnicodeDecodeError:
1391 # Malformed UTF-8 - be conservative and reject
1392 return False
1394 # Check against invalid names
1395 if normalized in INVALID_DOTNAMES:
1396 return False
1398 # Also check for 8.3 short name
1399 if normalized == b"git~1":
1400 return False
1402 return True
1405def validate_path(
1406 path: bytes,
1407 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1408) -> bool:
1409 """Default path validator that just checks for .git/."""
1410 parts = path.split(b"/")
1411 for p in parts:
1412 if not element_validator(p):
1413 return False
1414 else:
1415 return True
1418def build_index_from_tree(
1419 root_path: Union[str, bytes],
1420 index_path: Union[str, bytes],
1421 object_store: ObjectContainer,
1422 tree_id: bytes,
1423 honor_filemode: bool = True,
1424 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1425 symlink_fn: Optional[Callable] = None,
1426 blob_normalizer: Optional["BlobNormalizer"] = None,
1427 tree_encoding: str = "utf-8",
1428) -> None:
1429 """Generate and materialize index from a tree.
1431 Args:
1432 tree_id: Tree to materialize
1433 root_path: Target dir for materialized index files
1434 index_path: Target path for generated index
1435 object_store: Non-empty object store holding tree contents
1436 honor_filemode: An optional flag to honor core.filemode setting in
1437 config file, default is core.filemode=True, change executable bit
1438 validate_path_element: Function to validate path elements to check
1439 out; default just refuses .git and .. directories.
1440 blob_normalizer: An optional BlobNormalizer to use for converting line
1441 endings when writing blobs to the working directory.
1442 tree_encoding: Encoding used for tree paths (default: utf-8)
1444 Note: existing index is wiped and contents are not merged
1445 in a working dir. Suitable only for fresh clones.
1446 """
1447 index = Index(index_path, read=False)
1448 if not isinstance(root_path, bytes):
1449 root_path = os.fsencode(root_path)
1451 for entry in iter_tree_contents(object_store, tree_id):
1452 if not validate_path(entry.path, validate_path_element):
1453 continue
1454 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
1456 if not os.path.exists(os.path.dirname(full_path)):
1457 os.makedirs(os.path.dirname(full_path))
1459 # TODO(jelmer): Merge new index into working tree
1460 if S_ISGITLINK(entry.mode):
1461 if not os.path.isdir(full_path):
1462 os.mkdir(full_path)
1463 st = os.lstat(full_path)
1464 # TODO(jelmer): record and return submodule paths
1465 else:
1466 obj = object_store[entry.sha]
1467 assert isinstance(obj, Blob)
1468 # Apply blob normalization for checkout if normalizer is provided
1469 if blob_normalizer is not None:
1470 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1471 st = build_file_from_blob(
1472 obj,
1473 entry.mode,
1474 full_path,
1475 honor_filemode=honor_filemode,
1476 tree_encoding=tree_encoding,
1477 symlink_fn=symlink_fn,
1478 )
1480 # Add file to index
1481 if not honor_filemode or S_ISGITLINK(entry.mode):
1482 # we can not use tuple slicing to build a new tuple,
1483 # because on windows that will convert the times to
1484 # longs, which causes errors further along
1485 st_tuple = (
1486 entry.mode,
1487 st.st_ino,
1488 st.st_dev,
1489 st.st_nlink,
1490 st.st_uid,
1491 st.st_gid,
1492 st.st_size,
1493 st.st_atime,
1494 st.st_mtime,
1495 st.st_ctime,
1496 )
1497 st = st.__class__(st_tuple)
1498 # default to a stage 0 index entry (normal)
1499 # when reading from the filesystem
1500 index[entry.path] = index_entry_from_stat(st, entry.sha)
1502 index.write()
1505def blob_from_path_and_mode(
1506 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1507) -> Blob:
1508 """Create a blob from a path and a stat object.
1510 Args:
1511 fs_path: Full file system path to file
1512 mode: File mode
1513 Returns: A `Blob` object
1514 """
1515 assert isinstance(fs_path, bytes)
1516 blob = Blob()
1517 if stat.S_ISLNK(mode):
1518 if sys.platform == "win32":
1519 # os.readlink on Python3 on Windows requires a unicode string.
1520 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1521 else:
1522 blob.data = os.readlink(fs_path)
1523 else:
1524 with open(fs_path, "rb") as f:
1525 blob.data = f.read()
1526 return blob
1529def blob_from_path_and_stat(
1530 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
1531) -> Blob:
1532 """Create a blob from a path and a stat object.
1534 Args:
1535 fs_path: Full file system path to file
1536 st: A stat object
1537 Returns: A `Blob` object
1538 """
1539 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
1542def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
1543 """Read the head commit of a submodule.
1545 Args:
1546 path: path to the submodule
1547 Returns: HEAD sha, None if not a valid head/repository
1548 """
1549 from .errors import NotGitRepository
1550 from .repo import Repo
1552 # Repo currently expects a "str", so decode if necessary.
1553 # TODO(jelmer): Perhaps move this into Repo() ?
1554 if not isinstance(path, str):
1555 path = os.fsdecode(path)
1556 try:
1557 repo = Repo(path)
1558 except NotGitRepository:
1559 return None
1560 try:
1561 return repo.head()
1562 except KeyError:
1563 return None
1566def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
1567 """Check if a directory has changed after getting an error.
1569 When handling an error trying to create a blob from a path, call this
1570 function. It will check if the path is a directory. If it's a directory
1571 and a submodule, check the submodule head to see if it's has changed. If
1572 not, consider the file as changed as Git tracked a file and not a
1573 directory.
1575 Return true if the given path should be considered as changed and False
1576 otherwise or if the path is not a directory.
1577 """
1578 # This is actually a directory
1579 if os.path.exists(os.path.join(tree_path, b".git")):
1580 # Submodule
1581 head = read_submodule_head(tree_path)
1582 if entry.sha != head:
1583 return True
1584 else:
1585 # The file was changed to a directory, so consider it removed.
1586 return True
1588 return False
1591os_sep_bytes = os.sep.encode("ascii")
1594def _ensure_parent_dir_exists(full_path: bytes) -> None:
1595 """Ensure parent directory exists, checking no parent is a file."""
1596 parent_dir = os.path.dirname(full_path)
1597 if parent_dir and not os.path.exists(parent_dir):
1598 # Walk up the directory tree to find the first existing parent
1599 current = parent_dir
1600 parents_to_check: list[bytes] = []
1602 while current and not os.path.exists(current):
1603 parents_to_check.insert(0, current)
1604 new_parent = os.path.dirname(current)
1605 if new_parent == current:
1606 # Reached the root or can't go up further
1607 break
1608 current = new_parent
1610 # Check if the existing parent (if any) is a directory
1611 if current and os.path.exists(current) and not os.path.isdir(current):
1612 raise OSError(
1613 f"Cannot create directory, parent path is a file: {current!r}"
1614 )
1616 # Now check each parent we need to create isn't blocked by an existing file
1617 for parent_path in parents_to_check:
1618 if os.path.exists(parent_path) and not os.path.isdir(parent_path):
1619 raise OSError(
1620 f"Cannot create directory, parent path is a file: {parent_path!r}"
1621 )
1623 os.makedirs(parent_dir)
1626def _remove_file_with_readonly_handling(path: bytes) -> None:
1627 """Remove a file, handling read-only files on Windows.
1629 Args:
1630 path: Path to the file to remove
1631 """
1632 try:
1633 os.unlink(path)
1634 except PermissionError:
1635 # On Windows, remove read-only attribute and retry
1636 if sys.platform == "win32":
1637 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
1638 os.unlink(path)
1639 else:
1640 raise
1643def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
1644 """Remove empty parent directories up to stop_at."""
1645 parent = os.path.dirname(path)
1646 while parent and parent != stop_at:
1647 try:
1648 os.rmdir(parent)
1649 parent = os.path.dirname(parent)
1650 except FileNotFoundError:
1651 # Directory doesn't exist - stop trying
1652 break
1653 except OSError as e:
1654 if e.errno == errno.ENOTEMPTY:
1655 # Directory not empty - stop trying
1656 break
1657 raise
1660def _check_symlink_matches(
1661 full_path: bytes, repo_object_store: "BaseObjectStore", entry_sha: bytes
1662) -> bool:
1663 """Check if symlink target matches expected target.
1665 Returns True if symlink matches, False if it doesn't match.
1666 """
1667 try:
1668 current_target = os.readlink(full_path)
1669 blob_obj = repo_object_store[entry_sha]
1670 expected_target = blob_obj.as_raw_string()
1671 if isinstance(current_target, str):
1672 current_target = current_target.encode()
1673 return current_target == expected_target
1674 except FileNotFoundError:
1675 # Symlink doesn't exist
1676 return False
1677 except OSError as e:
1678 if e.errno == errno.EINVAL:
1679 # Not a symlink
1680 return False
1681 raise
1684def _check_file_matches(
1685 repo_object_store: "BaseObjectStore",
1686 full_path: bytes,
1687 entry_sha: bytes,
1688 entry_mode: int,
1689 current_stat: os.stat_result,
1690 honor_filemode: bool,
1691 blob_normalizer: Optional["BlobNormalizer"] = None,
1692 tree_path: Optional[bytes] = None,
1693) -> bool:
1694 """Check if a file on disk matches the expected git object.
1696 Returns True if file matches, False if it doesn't match.
1697 """
1698 # Check mode first (if honor_filemode is True)
1699 if honor_filemode:
1700 current_mode = stat.S_IMODE(current_stat.st_mode)
1701 expected_mode = stat.S_IMODE(entry_mode)
1703 # For regular files, only check the user executable bit, not group/other permissions
1704 # This matches Git's behavior where umask differences don't count as modifications
1705 if stat.S_ISREG(current_stat.st_mode):
1706 # Normalize regular file modes to ignore group/other write permissions
1707 current_mode_normalized = (
1708 current_mode & 0o755
1709 ) # Keep only user rwx and all read+execute
1710 expected_mode_normalized = expected_mode & 0o755
1712 # For Git compatibility, regular files should be either 644 or 755
1713 if expected_mode_normalized not in (0o644, 0o755):
1714 expected_mode_normalized = 0o644 # Default for regular files
1715 if current_mode_normalized not in (0o644, 0o755):
1716 # Determine if it should be executable based on user execute bit
1717 if current_mode & 0o100: # User execute bit is set
1718 current_mode_normalized = 0o755
1719 else:
1720 current_mode_normalized = 0o644
1722 if current_mode_normalized != expected_mode_normalized:
1723 return False
1724 else:
1725 # For non-regular files (symlinks, etc.), check mode exactly
1726 if current_mode != expected_mode:
1727 return False
1729 # If mode matches (or we don't care), check content via size first
1730 blob_obj = repo_object_store[entry_sha]
1731 if current_stat.st_size != blob_obj.raw_length():
1732 return False
1734 # Size matches, check actual content
1735 try:
1736 with open(full_path, "rb") as f:
1737 current_content = f.read()
1738 expected_content = blob_obj.as_raw_string()
1739 if blob_normalizer and tree_path is not None:
1740 assert isinstance(blob_obj, Blob)
1741 normalized_blob = blob_normalizer.checkout_normalize(
1742 blob_obj, tree_path
1743 )
1744 expected_content = normalized_blob.as_raw_string()
1745 return current_content == expected_content
1746 except (FileNotFoundError, PermissionError, IsADirectoryError):
1747 return False
1750def _transition_to_submodule(
1751 repo: "Repo",
1752 path: bytes,
1753 full_path: bytes,
1754 current_stat: Optional[os.stat_result],
1755 entry: IndexEntry,
1756 index: Index,
1757) -> None:
1758 """Transition any type to submodule."""
1759 from .submodule import ensure_submodule_placeholder
1761 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1762 # Already a directory, just ensure .git file exists
1763 ensure_submodule_placeholder(repo, path)
1764 else:
1765 # Remove whatever is there and create submodule
1766 if current_stat is not None:
1767 _remove_file_with_readonly_handling(full_path)
1768 ensure_submodule_placeholder(repo, path)
1770 st = os.lstat(full_path)
1771 index[path] = index_entry_from_stat(st, entry.sha)
1774def _transition_to_file(
1775 object_store: "BaseObjectStore",
1776 path: bytes,
1777 full_path: bytes,
1778 current_stat: Optional[os.stat_result],
1779 entry: IndexEntry,
1780 index: Index,
1781 honor_filemode: bool,
1782 symlink_fn: Optional[Callable[[bytes, bytes], None]],
1783 blob_normalizer: Optional["BlobNormalizer"],
1784 tree_encoding: str = "utf-8",
1785) -> None:
1786 """Transition any type to regular file or symlink."""
1787 # Check if we need to update
1788 if (
1789 current_stat is not None
1790 and stat.S_ISREG(current_stat.st_mode)
1791 and not stat.S_ISLNK(entry.mode)
1792 ):
1793 # File to file - check if update needed
1794 file_matches = _check_file_matches(
1795 object_store,
1796 full_path,
1797 entry.sha,
1798 entry.mode,
1799 current_stat,
1800 honor_filemode,
1801 blob_normalizer,
1802 path,
1803 )
1804 needs_update = not file_matches
1805 elif (
1806 current_stat is not None
1807 and stat.S_ISLNK(current_stat.st_mode)
1808 and stat.S_ISLNK(entry.mode)
1809 ):
1810 # Symlink to symlink - check if update needed
1811 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
1812 needs_update = not symlink_matches
1813 else:
1814 needs_update = True
1816 if not needs_update:
1817 # Just update index - current_stat should always be valid here since we're not updating
1818 assert current_stat is not None
1819 index[path] = index_entry_from_stat(current_stat, entry.sha)
1820 return
1822 # Remove existing entry if needed
1823 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1824 # Remove directory
1825 dir_contents = set(os.listdir(full_path))
1826 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
1828 if git_file_name in dir_contents:
1829 if dir_contents != {git_file_name}:
1830 raise IsADirectoryError(
1831 f"Cannot replace submodule with untracked files: {full_path!r}"
1832 )
1833 shutil.rmtree(full_path)
1834 else:
1835 try:
1836 os.rmdir(full_path)
1837 except OSError as e:
1838 if e.errno == errno.ENOTEMPTY:
1839 raise IsADirectoryError(
1840 f"Cannot replace non-empty directory with file: {full_path!r}"
1841 )
1842 raise
1843 elif current_stat is not None:
1844 _remove_file_with_readonly_handling(full_path)
1846 # Ensure parent directory exists
1847 _ensure_parent_dir_exists(full_path)
1849 # Write the file
1850 blob_obj = object_store[entry.sha]
1851 assert isinstance(blob_obj, Blob)
1852 if blob_normalizer:
1853 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
1854 st = build_file_from_blob(
1855 blob_obj,
1856 entry.mode,
1857 full_path,
1858 honor_filemode=honor_filemode,
1859 tree_encoding=tree_encoding,
1860 symlink_fn=symlink_fn,
1861 )
1862 index[path] = index_entry_from_stat(st, entry.sha)
1865def _transition_to_absent(
1866 repo: "Repo",
1867 path: bytes,
1868 full_path: bytes,
1869 current_stat: Optional[os.stat_result],
1870 index: Index,
1871) -> None:
1872 """Remove any type of entry."""
1873 if current_stat is None:
1874 return
1876 if stat.S_ISDIR(current_stat.st_mode):
1877 # Check if it's a submodule directory
1878 dir_contents = set(os.listdir(full_path))
1879 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
1881 if git_file_name in dir_contents and dir_contents == {git_file_name}:
1882 shutil.rmtree(full_path)
1883 else:
1884 try:
1885 os.rmdir(full_path)
1886 except OSError as e:
1887 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
1888 raise
1889 else:
1890 _remove_file_with_readonly_handling(full_path)
1892 try:
1893 del index[path]
1894 except KeyError:
1895 pass
1897 # Try to remove empty parent directories
1898 _remove_empty_parents(
1899 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
1900 )
1903def detect_case_only_renames(
1904 changes: list["TreeChange"],
1905 config: "Config",
1906) -> list["TreeChange"]:
1907 """Detect and transform case-only renames in a list of tree changes.
1909 This function identifies file renames that only differ in case (e.g.,
1910 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
1911 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
1912 based on the repository configuration.
1914 Args:
1915 changes: List of TreeChange objects representing file changes
1916 config: Repository configuration object
1918 Returns:
1919 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
1920 """
1921 from .diff_tree import (
1922 CHANGE_ADD,
1923 CHANGE_COPY,
1924 CHANGE_DELETE,
1925 CHANGE_MODIFY,
1926 CHANGE_RENAME,
1927 TreeChange,
1928 )
1930 # Build dictionaries of old and new paths with their normalized forms
1931 old_paths_normalized = {}
1932 new_paths_normalized = {}
1933 old_changes = {} # Map from old path to change object
1934 new_changes = {} # Map from new path to change object
1936 # Get the appropriate normalizer based on config
1937 normalize_func = get_path_element_normalizer(config)
1939 def normalize_path(path: bytes) -> bytes:
1940 """Normalize entire path using element normalization."""
1941 return b"/".join(normalize_func(part) for part in path.split(b"/"))
1943 # Pre-normalize all paths once to avoid repeated normalization
1944 for change in changes:
1945 if change.type == CHANGE_DELETE and change.old:
1946 try:
1947 normalized = normalize_path(change.old.path)
1948 except UnicodeDecodeError:
1949 import logging
1951 logging.warning(
1952 "Skipping case-only rename detection for path with invalid UTF-8: %r",
1953 change.old.path,
1954 )
1955 else:
1956 old_paths_normalized[normalized] = change.old.path
1957 old_changes[change.old.path] = change
1958 elif change.type == CHANGE_RENAME and change.old:
1959 # Treat RENAME as DELETE + ADD for case-only detection
1960 try:
1961 normalized = normalize_path(change.old.path)
1962 except UnicodeDecodeError:
1963 import logging
1965 logging.warning(
1966 "Skipping case-only rename detection for path with invalid UTF-8: %r",
1967 change.old.path,
1968 )
1969 else:
1970 old_paths_normalized[normalized] = change.old.path
1971 old_changes[change.old.path] = change
1973 if (
1974 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
1975 and change.new
1976 ):
1977 try:
1978 normalized = normalize_path(change.new.path)
1979 except UnicodeDecodeError:
1980 import logging
1982 logging.warning(
1983 "Skipping case-only rename detection for path with invalid UTF-8: %r",
1984 change.new.path,
1985 )
1986 else:
1987 new_paths_normalized[normalized] = change.new.path
1988 new_changes[change.new.path] = change
1990 # Find case-only renames and transform changes
1991 case_only_renames = set()
1992 new_rename_changes = []
1994 for norm_path, old_path in old_paths_normalized.items():
1995 if norm_path in new_paths_normalized:
1996 new_path = new_paths_normalized[norm_path]
1997 if old_path != new_path:
1998 # Found a case-only rename
1999 old_change = old_changes[old_path]
2000 new_change = new_changes[new_path]
2002 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
2003 if new_change.type == CHANGE_ADD:
2004 # Simple case: DELETE + ADD becomes RENAME
2005 rename_change = TreeChange(
2006 CHANGE_RENAME, old_change.old, new_change.new
2007 )
2008 else:
2009 # Complex case: DELETE + MODIFY becomes RENAME
2010 # Use the old file from DELETE and new file from MODIFY
2011 rename_change = TreeChange(
2012 CHANGE_RENAME, old_change.old, new_change.new
2013 )
2015 new_rename_changes.append(rename_change)
2017 # Mark the old changes for removal
2018 case_only_renames.add(old_change)
2019 case_only_renames.add(new_change)
2021 # Return new list with original ADD/DELETE changes replaced by renames
2022 result = [change for change in changes if change not in case_only_renames]
2023 result.extend(new_rename_changes)
2024 return result
2027def update_working_tree(
2028 repo: "Repo",
2029 old_tree_id: Optional[bytes],
2030 new_tree_id: bytes,
2031 change_iterator: Iterator["TreeChange"],
2032 honor_filemode: bool = True,
2033 validate_path_element: Optional[Callable[[bytes], bool]] = None,
2034 symlink_fn: Optional[Callable] = None,
2035 force_remove_untracked: bool = False,
2036 blob_normalizer: Optional["BlobNormalizer"] = None,
2037 tree_encoding: str = "utf-8",
2038 allow_overwrite_modified: bool = False,
2039) -> None:
2040 """Update the working tree and index to match a new tree.
2042 This function handles:
2043 - Adding new files
2044 - Updating modified files
2045 - Removing deleted files
2046 - Cleaning up empty directories
2048 Args:
2049 repo: Repository object
2050 old_tree_id: SHA of the tree before the update
2051 new_tree_id: SHA of the tree to update to
2052 change_iterator: Iterator of TreeChange objects to apply
2053 honor_filemode: An optional flag to honor core.filemode setting
2054 validate_path_element: Function to validate path elements to check out
2055 symlink_fn: Function to use for creating symlinks
2056 force_remove_untracked: If True, remove files that exist in working
2057 directory but not in target tree, even if old_tree_id is None
2058 blob_normalizer: An optional BlobNormalizer to use for converting line
2059 endings when writing blobs to the working directory.
2060 tree_encoding: Encoding used for tree paths (default: utf-8)
2061 allow_overwrite_modified: If False, raise an error when attempting to
2062 overwrite files that have been modified compared to old_tree_id
2063 """
2064 if validate_path_element is None:
2065 validate_path_element = validate_path_element_default
2067 from .diff_tree import (
2068 CHANGE_ADD,
2069 CHANGE_COPY,
2070 CHANGE_DELETE,
2071 CHANGE_MODIFY,
2072 CHANGE_RENAME,
2073 CHANGE_UNCHANGED,
2074 )
2076 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2077 index = repo.open_index()
2079 # Convert iterator to list since we need multiple passes
2080 changes = list(change_iterator)
2082 # Transform case-only renames on case-insensitive filesystems
2083 import platform
2085 default_ignore_case = platform.system() in ("Windows", "Darwin")
2086 config = repo.get_config()
2087 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2089 if ignore_case:
2090 config = repo.get_config()
2091 changes = detect_case_only_renames(changes, config)
2093 # Check for path conflicts where files need to become directories
2094 paths_becoming_dirs = set()
2095 for change in changes:
2096 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2097 path = change.new.path
2098 if b"/" in path: # This is a file inside a directory
2099 # Check if any parent path exists as a file in the old tree or changes
2100 parts = path.split(b"/")
2101 for i in range(1, len(parts)):
2102 parent = b"/".join(parts[:i])
2103 # See if this parent path is being deleted (was a file, becoming a dir)
2104 for other_change in changes:
2105 if (
2106 other_change.type == CHANGE_DELETE
2107 and other_change.old
2108 and other_change.old.path == parent
2109 ):
2110 paths_becoming_dirs.add(parent)
2112 # Check if any path that needs to become a directory has been modified
2113 for path in paths_becoming_dirs:
2114 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2115 try:
2116 current_stat = os.lstat(full_path)
2117 except FileNotFoundError:
2118 continue # File doesn't exist, nothing to check
2119 except OSError as e:
2120 raise OSError(
2121 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2122 ) from e
2124 if stat.S_ISREG(current_stat.st_mode):
2125 # Find the old entry for this path
2126 old_change = None
2127 for change in changes:
2128 if (
2129 change.type == CHANGE_DELETE
2130 and change.old
2131 and change.old.path == path
2132 ):
2133 old_change = change
2134 break
2136 if old_change:
2137 # Check if file has been modified
2138 file_matches = _check_file_matches(
2139 repo.object_store,
2140 full_path,
2141 old_change.old.sha,
2142 old_change.old.mode,
2143 current_stat,
2144 honor_filemode,
2145 blob_normalizer,
2146 path,
2147 )
2148 if not file_matches:
2149 raise OSError(
2150 f"Cannot replace modified file with directory: {path!r}"
2151 )
2153 # Check for uncommitted modifications before making any changes
2154 if not allow_overwrite_modified and old_tree_id:
2155 for change in changes:
2156 # Only check files that are being modified or deleted
2157 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2158 path = change.old.path
2159 if path.startswith(b".git") or not validate_path(
2160 path, validate_path_element
2161 ):
2162 continue
2164 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2165 try:
2166 current_stat = os.lstat(full_path)
2167 except FileNotFoundError:
2168 continue # File doesn't exist, nothing to check
2169 except OSError as e:
2170 raise OSError(
2171 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2172 ) from e
2174 if stat.S_ISREG(current_stat.st_mode):
2175 # Check if working tree file differs from old tree
2176 file_matches = _check_file_matches(
2177 repo.object_store,
2178 full_path,
2179 change.old.sha,
2180 change.old.mode,
2181 current_stat,
2182 honor_filemode,
2183 blob_normalizer,
2184 path,
2185 )
2186 if not file_matches:
2187 from .errors import WorkingTreeModifiedError
2189 raise WorkingTreeModifiedError(
2190 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2191 f"would be overwritten by checkout. "
2192 f"Please commit your changes or stash them before you switch branches."
2193 )
2195 # Apply the changes
2196 for change in changes:
2197 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2198 # Remove file/directory
2199 path = change.old.path
2200 if path.startswith(b".git") or not validate_path(
2201 path, validate_path_element
2202 ):
2203 continue
2205 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2206 try:
2207 delete_stat: Optional[os.stat_result] = os.lstat(full_path)
2208 except FileNotFoundError:
2209 delete_stat = None
2210 except OSError as e:
2211 raise OSError(
2212 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2213 ) from e
2215 _transition_to_absent(repo, path, full_path, delete_stat, index)
2217 if change.type in (
2218 CHANGE_ADD,
2219 CHANGE_MODIFY,
2220 CHANGE_UNCHANGED,
2221 CHANGE_COPY,
2222 CHANGE_RENAME,
2223 ):
2224 # Add or modify file
2225 path = change.new.path
2226 if path.startswith(b".git") or not validate_path(
2227 path, validate_path_element
2228 ):
2229 continue
2231 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2232 try:
2233 modify_stat: Optional[os.stat_result] = os.lstat(full_path)
2234 except FileNotFoundError:
2235 modify_stat = None
2236 except OSError as e:
2237 raise OSError(
2238 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2239 ) from e
2241 if S_ISGITLINK(change.new.mode):
2242 _transition_to_submodule(
2243 repo, path, full_path, modify_stat, change.new, index
2244 )
2245 else:
2246 _transition_to_file(
2247 repo.object_store,
2248 path,
2249 full_path,
2250 modify_stat,
2251 change.new,
2252 index,
2253 honor_filemode,
2254 symlink_fn,
2255 blob_normalizer,
2256 tree_encoding,
2257 )
2259 index.write()
2262def get_unstaged_changes(
2263 index: Index,
2264 root_path: Union[str, bytes],
2265 filter_blob_callback: Optional[Callable] = None,
2266) -> Generator[bytes, None, None]:
2267 """Walk through an index and check for differences against working tree.
2269 Args:
2270 index: index to check
2271 root_path: path in which to find files
2272 Returns: iterator over paths with unstaged changes
2273 """
2274 # For each entry in the index check the sha1 & ensure not staged
2275 if not isinstance(root_path, bytes):
2276 root_path = os.fsencode(root_path)
2278 for tree_path, entry in index.iteritems():
2279 full_path = _tree_to_fs_path(root_path, tree_path)
2280 if isinstance(entry, ConflictedIndexEntry):
2281 # Conflicted files are always unstaged
2282 yield tree_path
2283 continue
2285 try:
2286 st = os.lstat(full_path)
2287 if stat.S_ISDIR(st.st_mode):
2288 if _has_directory_changed(tree_path, entry):
2289 yield tree_path
2290 continue
2292 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2293 continue
2295 blob = blob_from_path_and_stat(full_path, st)
2297 if filter_blob_callback is not None:
2298 blob = filter_blob_callback(blob, tree_path)
2299 except FileNotFoundError:
2300 # The file was removed, so we assume that counts as
2301 # different from whatever file used to exist.
2302 yield tree_path
2303 else:
2304 if blob.id != entry.sha:
2305 yield tree_path
2308def _tree_to_fs_path(
2309 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
2310) -> bytes:
2311 """Convert a git tree path to a file system path.
2313 Args:
2314 root_path: Root filesystem path
2315 tree_path: Git tree path as bytes (encoded with tree_encoding)
2316 tree_encoding: Encoding used for tree paths (default: utf-8)
2318 Returns: File system path.
2319 """
2320 assert isinstance(tree_path, bytes)
2321 if os_sep_bytes != b"/":
2322 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
2323 else:
2324 sep_corrected_path = tree_path
2326 # On Windows, we need to handle tree path encoding properly
2327 if sys.platform == "win32":
2328 # Decode from tree encoding, then re-encode for filesystem
2329 try:
2330 tree_path_str = sep_corrected_path.decode(tree_encoding)
2331 sep_corrected_path = os.fsencode(tree_path_str)
2332 except UnicodeDecodeError:
2333 # If decoding fails, use the original bytes
2334 pass
2336 return os.path.join(root_path, sep_corrected_path)
2339def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes:
2340 """Convert a file system path to a git tree path.
2342 Args:
2343 fs_path: File system path.
2344 tree_encoding: Encoding to use for tree paths (default: utf-8)
2346 Returns: Git tree path as bytes (encoded with tree_encoding)
2347 """
2348 if not isinstance(fs_path, bytes):
2349 fs_path_bytes = os.fsencode(fs_path)
2350 else:
2351 fs_path_bytes = fs_path
2353 # On Windows, we need to ensure tree paths are properly encoded
2354 if sys.platform == "win32":
2355 try:
2356 # Decode from filesystem encoding, then re-encode with tree encoding
2357 fs_path_str = os.fsdecode(fs_path_bytes)
2358 fs_path_bytes = fs_path_str.encode(tree_encoding)
2359 except UnicodeDecodeError:
2360 # If filesystem decoding fails, use the original bytes
2361 pass
2363 if os_sep_bytes != b"/":
2364 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2365 else:
2366 tree_path = fs_path_bytes
2367 return tree_path
2370def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
2371 if os.path.exists(os.path.join(path, b".git")):
2372 head = read_submodule_head(path)
2373 if head is None:
2374 return None
2375 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2376 return None
2379def index_entry_from_path(
2380 path: bytes, object_store: Optional[ObjectContainer] = None
2381) -> Optional[IndexEntry]:
2382 """Create an index from a filesystem path.
2384 This returns an index value for files, symlinks
2385 and tree references. for directories and
2386 non-existent files it returns None
2388 Args:
2389 path: Path to create an index entry for
2390 object_store: Optional object store to
2391 save new blobs in
2392 Returns: An index entry; None for directories
2393 """
2394 assert isinstance(path, bytes)
2395 st = os.lstat(path)
2396 if stat.S_ISDIR(st.st_mode):
2397 return index_entry_from_directory(st, path)
2399 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
2400 blob = blob_from_path_and_stat(path, st)
2401 if object_store is not None:
2402 object_store.add_object(blob)
2403 return index_entry_from_stat(st, blob.id)
2405 return None
2408def iter_fresh_entries(
2409 paths: Iterable[bytes],
2410 root_path: bytes,
2411 object_store: Optional[ObjectContainer] = None,
2412) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
2413 """Iterate over current versions of index entries on disk.
2415 Args:
2416 paths: Paths to iterate over
2417 root_path: Root path to access from
2418 object_store: Optional store to save new blobs in
2419 Returns: Iterator over path, index_entry
2420 """
2421 for path in paths:
2422 p = _tree_to_fs_path(root_path, path)
2423 try:
2424 entry = index_entry_from_path(p, object_store=object_store)
2425 except (FileNotFoundError, IsADirectoryError):
2426 entry = None
2427 yield path, entry
2430def iter_fresh_objects(
2431 paths: Iterable[bytes],
2432 root_path: bytes,
2433 include_deleted: bool = False,
2434 object_store: Optional[ObjectContainer] = None,
2435) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
2436 """Iterate over versions of objects on disk referenced by index.
2438 Args:
2439 root_path: Root path to access from
2440 include_deleted: Include deleted entries with sha and
2441 mode set to None
2442 object_store: Optional object store to report new items to
2443 Returns: Iterator over path, sha, mode
2444 """
2445 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
2446 if entry is None:
2447 if include_deleted:
2448 yield path, None, None
2449 else:
2450 yield path, entry.sha, cleanup_mode(entry.mode)
2453def refresh_index(index: Index, root_path: bytes) -> None:
2454 """Refresh the contents of an index.
2456 This is the equivalent to running 'git commit -a'.
2458 Args:
2459 index: Index to update
2460 root_path: Root filesystem path
2461 """
2462 for path, entry in iter_fresh_entries(index, root_path):
2463 if entry:
2464 index[path] = entry
2467class locked_index:
2468 """Lock the index while making modifications.
2470 Works as a context manager.
2471 """
2473 _file: "_GitFile"
2475 def __init__(self, path: Union[bytes, str]) -> None:
2476 self._path = path
2478 def __enter__(self) -> Index:
2479 f = GitFile(self._path, "wb")
2480 assert isinstance(f, _GitFile) # GitFile in write mode always returns _GitFile
2481 self._file = f
2482 self._index = Index(self._path)
2483 return self._index
2485 def __exit__(
2486 self,
2487 exc_type: Optional[type],
2488 exc_value: Optional[BaseException],
2489 traceback: Optional[types.TracebackType],
2490 ) -> None:
2491 if exc_type is not None:
2492 self._file.abort()
2493 return
2494 try:
2495 from typing import BinaryIO, cast
2497 f = SHA1Writer(cast(BinaryIO, self._file))
2498 write_index_dict(cast(BinaryIO, f), self._index._byname)
2499 except BaseException:
2500 self._file.abort()
2501 else:
2502 f.close()