Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/index.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Parser for the git index file format."""
24import errno
25import os
26import shutil
27import stat
28import struct
29import sys
30import types
31from collections.abc import Generator, Iterable, Iterator
32from dataclasses import dataclass
33from enum import Enum
34from typing import (
35 TYPE_CHECKING,
36 Any,
37 BinaryIO,
38 Callable,
39 Optional,
40 Union,
41 cast,
42)
44if TYPE_CHECKING:
45 from .config import Config
46 from .diff_tree import TreeChange
47 from .file import _GitFile
48 from .line_ending import BlobNormalizer
49 from .repo import Repo
51from .file import GitFile
52from .object_store import iter_tree_contents
53from .objects import (
54 S_IFGITLINK,
55 S_ISGITLINK,
56 Blob,
57 ObjectID,
58 Tree,
59 hex_to_sha,
60 sha_to_hex,
61)
62from .pack import ObjectContainer, SHA1Reader, SHA1Writer
64# 2-bit stage (during merge)
65FLAG_STAGEMASK = 0x3000
66FLAG_STAGESHIFT = 12
67FLAG_NAMEMASK = 0x0FFF
69# assume-valid
70FLAG_VALID = 0x8000
72# extended flag (must be zero in version 2)
73FLAG_EXTENDED = 0x4000
75# used by sparse checkout
76EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
78# used by "git add -N"
79EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
81DEFAULT_VERSION = 2
83# Index extension signatures
84TREE_EXTENSION = b"TREE"
85REUC_EXTENSION = b"REUC"
86UNTR_EXTENSION = b"UNTR"
87EOIE_EXTENSION = b"EOIE"
88IEOT_EXTENSION = b"IEOT"
91def _encode_varint(value: int) -> bytes:
92 """Encode an integer using variable-width encoding.
94 Same format as used for OFS_DELTA pack entries and index v4 path compression.
95 Uses 7 bits per byte, with the high bit indicating continuation.
97 Args:
98 value: Integer to encode
99 Returns:
100 Encoded bytes
101 """
102 if value == 0:
103 return b"\x00"
105 result = []
106 while value > 0:
107 byte = value & 0x7F # Take lower 7 bits
108 value >>= 7
109 if value > 0:
110 byte |= 0x80 # Set continuation bit
111 result.append(byte)
113 return bytes(result)
116def _decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
117 """Decode a variable-width encoded integer.
119 Args:
120 data: Bytes to decode from
121 offset: Starting offset in data
122 Returns:
123 tuple of (decoded_value, new_offset)
124 """
125 value = 0
126 shift = 0
127 pos = offset
129 while pos < len(data):
130 byte = data[pos]
131 pos += 1
132 value |= (byte & 0x7F) << shift
133 shift += 7
134 if not (byte & 0x80): # No continuation bit
135 break
137 return value, pos
140def _compress_path(path: bytes, previous_path: bytes) -> bytes:
141 """Compress a path relative to the previous path for index version 4.
143 Args:
144 path: Path to compress
145 previous_path: Previous path for comparison
146 Returns:
147 Compressed path data (varint prefix_len + suffix)
148 """
149 # Find the common prefix length
150 common_len = 0
151 min_len = min(len(path), len(previous_path))
153 for i in range(min_len):
154 if path[i] == previous_path[i]:
155 common_len += 1
156 else:
157 break
159 # The number of bytes to remove from the end of previous_path
160 # to get the common prefix
161 remove_len = len(previous_path) - common_len
163 # The suffix to append
164 suffix = path[common_len:]
166 # Encode: varint(remove_len) + suffix + NUL
167 return _encode_varint(remove_len) + suffix + b"\x00"
170def _decompress_path(
171 data: bytes, offset: int, previous_path: bytes
172) -> tuple[bytes, int]:
173 """Decompress a path from index version 4 compressed format.
175 Args:
176 data: Raw data containing compressed path
177 offset: Starting offset in data
178 previous_path: Previous path for decompression
179 Returns:
180 tuple of (decompressed_path, new_offset)
181 """
182 # Decode the number of bytes to remove from previous path
183 remove_len, new_offset = _decode_varint(data, offset)
185 # Find the NUL terminator for the suffix
186 suffix_start = new_offset
187 suffix_end = suffix_start
188 while suffix_end < len(data) and data[suffix_end] != 0:
189 suffix_end += 1
191 if suffix_end >= len(data):
192 raise ValueError("Unterminated path suffix in compressed entry")
194 suffix = data[suffix_start:suffix_end]
195 new_offset = suffix_end + 1 # Skip the NUL terminator
197 # Reconstruct the path
198 if remove_len > len(previous_path):
199 raise ValueError(
200 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
201 )
203 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
204 path = prefix + suffix
206 return path, new_offset
209def _decompress_path_from_stream(
210 f: BinaryIO, previous_path: bytes
211) -> tuple[bytes, int]:
212 """Decompress a path from index version 4 compressed format, reading from stream.
214 Args:
215 f: File-like object to read from
216 previous_path: Previous path for decompression
217 Returns:
218 tuple of (decompressed_path, bytes_consumed)
219 """
220 # Decode the varint for remove_len by reading byte by byte
221 remove_len = 0
222 shift = 0
223 bytes_consumed = 0
225 while True:
226 byte_data = f.read(1)
227 if not byte_data:
228 raise ValueError("Unexpected end of file while reading varint")
229 byte = byte_data[0]
230 bytes_consumed += 1
231 remove_len |= (byte & 0x7F) << shift
232 shift += 7
233 if not (byte & 0x80): # No continuation bit
234 break
236 # Read the suffix until NUL terminator
237 suffix = b""
238 while True:
239 byte_data = f.read(1)
240 if not byte_data:
241 raise ValueError("Unexpected end of file while reading path suffix")
242 byte = byte_data[0]
243 bytes_consumed += 1
244 if byte == 0: # NUL terminator
245 break
246 suffix += bytes([byte])
248 # Reconstruct the path
249 if remove_len > len(previous_path):
250 raise ValueError(
251 f"Invalid path compression: trying to remove {remove_len} bytes from {len(previous_path)}-byte path"
252 )
254 prefix = previous_path[:-remove_len] if remove_len > 0 else previous_path
255 path = prefix + suffix
257 return path, bytes_consumed
260class Stage(Enum):
261 NORMAL = 0
262 MERGE_CONFLICT_ANCESTOR = 1
263 MERGE_CONFLICT_THIS = 2
264 MERGE_CONFLICT_OTHER = 3
267@dataclass
268class SerializedIndexEntry:
269 name: bytes
270 ctime: Union[int, float, tuple[int, int]]
271 mtime: Union[int, float, tuple[int, int]]
272 dev: int
273 ino: int
274 mode: int
275 uid: int
276 gid: int
277 size: int
278 sha: bytes
279 flags: int
280 extended_flags: int
282 def stage(self) -> Stage:
283 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
286@dataclass
287class IndexExtension:
288 """Base class for index extensions."""
290 signature: bytes
291 data: bytes
293 @classmethod
294 def from_raw(cls, signature: bytes, data: bytes) -> "IndexExtension":
295 """Create an extension from raw data.
297 Args:
298 signature: 4-byte extension signature
299 data: Extension data
300 Returns:
301 Parsed extension object
302 """
303 if signature == TREE_EXTENSION:
304 return TreeExtension.from_bytes(data)
305 elif signature == REUC_EXTENSION:
306 return ResolveUndoExtension.from_bytes(data)
307 elif signature == UNTR_EXTENSION:
308 return UntrackedExtension.from_bytes(data)
309 else:
310 # Unknown extension - just store raw data
311 return cls(signature, data)
313 def to_bytes(self) -> bytes:
314 """Serialize extension to bytes."""
315 return self.data
318class TreeExtension(IndexExtension):
319 """Tree cache extension."""
321 def __init__(self, entries: list[tuple[bytes, bytes, int]]) -> None:
322 self.entries = entries
323 super().__init__(TREE_EXTENSION, b"")
325 @classmethod
326 def from_bytes(cls, data: bytes) -> "TreeExtension":
327 # TODO: Implement tree cache parsing
328 return cls([])
330 def to_bytes(self) -> bytes:
331 # TODO: Implement tree cache serialization
332 return b""
335class ResolveUndoExtension(IndexExtension):
336 """Resolve undo extension for recording merge conflicts."""
338 def __init__(self, entries: list[tuple[bytes, list[tuple[int, bytes]]]]) -> None:
339 self.entries = entries
340 super().__init__(REUC_EXTENSION, b"")
342 @classmethod
343 def from_bytes(cls, data: bytes) -> "ResolveUndoExtension":
344 # TODO: Implement resolve undo parsing
345 return cls([])
347 def to_bytes(self) -> bytes:
348 # TODO: Implement resolve undo serialization
349 return b""
352class UntrackedExtension(IndexExtension):
353 """Untracked cache extension."""
355 def __init__(self, data: bytes) -> None:
356 super().__init__(UNTR_EXTENSION, data)
358 @classmethod
359 def from_bytes(cls, data: bytes) -> "UntrackedExtension":
360 return cls(data)
363@dataclass
364class IndexEntry:
365 ctime: Union[int, float, tuple[int, int]]
366 mtime: Union[int, float, tuple[int, int]]
367 dev: int
368 ino: int
369 mode: int
370 uid: int
371 gid: int
372 size: int
373 sha: bytes
374 flags: int = 0
375 extended_flags: int = 0
377 @classmethod
378 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
379 return cls(
380 ctime=serialized.ctime,
381 mtime=serialized.mtime,
382 dev=serialized.dev,
383 ino=serialized.ino,
384 mode=serialized.mode,
385 uid=serialized.uid,
386 gid=serialized.gid,
387 size=serialized.size,
388 sha=serialized.sha,
389 flags=serialized.flags,
390 extended_flags=serialized.extended_flags,
391 )
393 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
394 # Clear out any existing stage bits, then set them from the Stage.
395 new_flags = self.flags & ~FLAG_STAGEMASK
396 new_flags |= stage.value << FLAG_STAGESHIFT
397 return SerializedIndexEntry(
398 name=name,
399 ctime=self.ctime,
400 mtime=self.mtime,
401 dev=self.dev,
402 ino=self.ino,
403 mode=self.mode,
404 uid=self.uid,
405 gid=self.gid,
406 size=self.size,
407 sha=self.sha,
408 flags=new_flags,
409 extended_flags=self.extended_flags,
410 )
412 def stage(self) -> Stage:
413 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
415 @property
416 def skip_worktree(self) -> bool:
417 """Return True if the skip-worktree bit is set in extended_flags."""
418 return bool(self.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
420 def set_skip_worktree(self, skip: bool = True) -> None:
421 """Helper method to set or clear the skip-worktree bit in extended_flags.
422 Also sets FLAG_EXTENDED in self.flags if needed.
423 """
424 if skip:
425 # Turn on the skip-worktree bit
426 self.extended_flags |= EXTENDED_FLAG_SKIP_WORKTREE
427 # Also ensure the main 'extended' bit is set in flags
428 self.flags |= FLAG_EXTENDED
429 else:
430 # Turn off the skip-worktree bit
431 self.extended_flags &= ~EXTENDED_FLAG_SKIP_WORKTREE
432 # Optionally unset the main extended bit if no extended flags remain
433 if self.extended_flags == 0:
434 self.flags &= ~FLAG_EXTENDED
437class ConflictedIndexEntry:
438 """Index entry that represents a conflict."""
440 ancestor: Optional[IndexEntry]
441 this: Optional[IndexEntry]
442 other: Optional[IndexEntry]
444 def __init__(
445 self,
446 ancestor: Optional[IndexEntry] = None,
447 this: Optional[IndexEntry] = None,
448 other: Optional[IndexEntry] = None,
449 ) -> None:
450 self.ancestor = ancestor
451 self.this = this
452 self.other = other
455class UnmergedEntries(Exception):
456 """Unmerged entries exist in the index."""
459def pathsplit(path: bytes) -> tuple[bytes, bytes]:
460 """Split a /-delimited path into a directory part and a basename.
462 Args:
463 path: The path to split.
465 Returns:
466 Tuple with directory name and basename
467 """
468 try:
469 (dirname, basename) = path.rsplit(b"/", 1)
470 except ValueError:
471 return (b"", path)
472 else:
473 return (dirname, basename)
476def pathjoin(*args: bytes) -> bytes:
477 """Join a /-delimited path."""
478 return b"/".join([p for p in args if p])
481def read_cache_time(f: BinaryIO) -> tuple[int, int]:
482 """Read a cache time.
484 Args:
485 f: File-like object to read from
486 Returns:
487 Tuple with seconds and nanoseconds
488 """
489 return struct.unpack(">LL", f.read(8))
492def write_cache_time(f: BinaryIO, t: Union[int, float, tuple[int, int]]) -> None:
493 """Write a cache time.
495 Args:
496 f: File-like object to write to
497 t: Time to write (as int, float or tuple with secs and nsecs)
498 """
499 if isinstance(t, int):
500 t = (t, 0)
501 elif isinstance(t, float):
502 (secs, nsecs) = divmod(t, 1.0)
503 t = (int(secs), int(nsecs * 1000000000))
504 elif not isinstance(t, tuple):
505 raise TypeError(t)
506 f.write(struct.pack(">LL", *t))
509def read_cache_entry(
510 f: BinaryIO, version: int, previous_path: bytes = b""
511) -> SerializedIndexEntry:
512 """Read an entry from a cache file.
514 Args:
515 f: File-like object to read from
516 version: Index version
517 previous_path: Previous entry's path (for version 4 compression)
518 """
519 beginoffset = f.tell()
520 ctime = read_cache_time(f)
521 mtime = read_cache_time(f)
522 (
523 dev,
524 ino,
525 mode,
526 uid,
527 gid,
528 size,
529 sha,
530 flags,
531 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
532 if flags & FLAG_EXTENDED:
533 if version < 3:
534 raise AssertionError("extended flag set in index with version < 3")
535 (extended_flags,) = struct.unpack(">H", f.read(2))
536 else:
537 extended_flags = 0
539 if version >= 4:
540 # Version 4: paths are always compressed (name_len should be 0)
541 name, consumed = _decompress_path_from_stream(f, previous_path)
542 else:
543 # Versions < 4: regular name reading
544 name = f.read(flags & FLAG_NAMEMASK)
546 # Padding:
547 if version < 4:
548 real_size = (f.tell() - beginoffset + 8) & ~7
549 f.read((beginoffset + real_size) - f.tell())
551 return SerializedIndexEntry(
552 name,
553 ctime,
554 mtime,
555 dev,
556 ino,
557 mode,
558 uid,
559 gid,
560 size,
561 sha_to_hex(sha),
562 flags & ~FLAG_NAMEMASK,
563 extended_flags,
564 )
567def write_cache_entry(
568 f: BinaryIO, entry: SerializedIndexEntry, version: int, previous_path: bytes = b""
569) -> None:
570 """Write an index entry to a file.
572 Args:
573 f: File object
574 entry: IndexEntry to write
575 version: Index format version
576 previous_path: Previous entry's path (for version 4 compression)
577 """
578 beginoffset = f.tell()
579 write_cache_time(f, entry.ctime)
580 write_cache_time(f, entry.mtime)
582 if version >= 4:
583 # Version 4: use compression but set name_len to actual filename length
584 # This matches how C Git implements index v4 flags
585 compressed_path = _compress_path(entry.name, previous_path)
586 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
587 else:
588 # Versions < 4: include actual name length
589 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
591 if entry.extended_flags:
592 flags |= FLAG_EXTENDED
593 if flags & FLAG_EXTENDED and version is not None and version < 3:
594 raise AssertionError("unable to use extended flags in version < 3")
596 f.write(
597 struct.pack(
598 b">LLLLLL20sH",
599 entry.dev & 0xFFFFFFFF,
600 entry.ino & 0xFFFFFFFF,
601 entry.mode,
602 entry.uid,
603 entry.gid,
604 entry.size,
605 hex_to_sha(entry.sha),
606 flags,
607 )
608 )
609 if flags & FLAG_EXTENDED:
610 f.write(struct.pack(b">H", entry.extended_flags))
612 if version >= 4:
613 # Version 4: always write compressed path
614 f.write(compressed_path)
615 else:
616 # Versions < 4: write regular path and padding
617 f.write(entry.name)
618 real_size = (f.tell() - beginoffset + 8) & ~7
619 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
622class UnsupportedIndexFormat(Exception):
623 """An unsupported index format was encountered."""
625 def __init__(self, version: int) -> None:
626 self.index_format_version = version
629def read_index_header(f: BinaryIO) -> tuple[int, int]:
630 """Read an index header from a file.
632 Returns:
633 tuple of (version, num_entries)
634 """
635 header = f.read(4)
636 if header != b"DIRC":
637 raise AssertionError(f"Invalid index file header: {header!r}")
638 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
639 if version not in (1, 2, 3, 4):
640 raise UnsupportedIndexFormat(version)
641 return version, num_entries
644def write_index_extension(f: BinaryIO, extension: IndexExtension) -> None:
645 """Write an index extension.
647 Args:
648 f: File-like object to write to
649 extension: Extension to write
650 """
651 data = extension.to_bytes()
652 f.write(extension.signature)
653 f.write(struct.pack(">I", len(data)))
654 f.write(data)
657def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
658 """Read an index file, yielding the individual entries."""
659 version, num_entries = read_index_header(f)
660 previous_path = b""
661 for i in range(num_entries):
662 entry = read_cache_entry(f, version, previous_path)
663 previous_path = entry.name
664 yield entry
667def read_index_dict_with_version(
668 f: BinaryIO,
669) -> tuple[
670 dict[bytes, Union[IndexEntry, ConflictedIndexEntry]], int, list[IndexExtension]
671]:
672 """Read an index file and return it as a dictionary along with the version.
674 Returns:
675 tuple of (entries_dict, version, extensions)
676 """
677 version, num_entries = read_index_header(f)
679 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
680 previous_path = b""
681 for i in range(num_entries):
682 entry = read_cache_entry(f, version, previous_path)
683 previous_path = entry.name
684 stage = entry.stage()
685 if stage == Stage.NORMAL:
686 ret[entry.name] = IndexEntry.from_serialized(entry)
687 else:
688 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
689 if isinstance(existing, IndexEntry):
690 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
691 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
692 existing.ancestor = IndexEntry.from_serialized(entry)
693 elif stage == Stage.MERGE_CONFLICT_THIS:
694 existing.this = IndexEntry.from_serialized(entry)
695 elif stage == Stage.MERGE_CONFLICT_OTHER:
696 existing.other = IndexEntry.from_serialized(entry)
698 # Read extensions
699 extensions = []
700 while True:
701 # Check if we're at the end (20 bytes before EOF for SHA checksum)
702 current_pos = f.tell()
703 f.seek(0, 2) # EOF
704 eof_pos = f.tell()
705 f.seek(current_pos)
707 if current_pos >= eof_pos - 20:
708 break
710 # Try to read extension signature
711 signature = f.read(4)
712 if len(signature) < 4:
713 break
715 # Check if it's a valid extension signature (4 uppercase letters)
716 if not all(65 <= b <= 90 for b in signature):
717 # Not an extension, seek back
718 f.seek(-4, 1)
719 break
721 # Read extension size
722 size_data = f.read(4)
723 if len(size_data) < 4:
724 break
725 size = struct.unpack(">I", size_data)[0]
727 # Read extension data
728 data = f.read(size)
729 if len(data) < size:
730 break
732 extension = IndexExtension.from_raw(signature, data)
733 extensions.append(extension)
735 return ret, version, extensions
738def read_index_dict(
739 f: BinaryIO,
740) -> dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
741 """Read an index file and return it as a dictionary.
742 Dict Key is tuple of path and stage number, as
743 path alone is not unique
744 Args:
745 f: File object to read fromls.
746 """
747 ret: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
748 for entry in read_index(f):
749 stage = entry.stage()
750 if stage == Stage.NORMAL:
751 ret[entry.name] = IndexEntry.from_serialized(entry)
752 else:
753 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
754 if isinstance(existing, IndexEntry):
755 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
756 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
757 existing.ancestor = IndexEntry.from_serialized(entry)
758 elif stage == Stage.MERGE_CONFLICT_THIS:
759 existing.this = IndexEntry.from_serialized(entry)
760 elif stage == Stage.MERGE_CONFLICT_OTHER:
761 existing.other = IndexEntry.from_serialized(entry)
762 return ret
765def write_index(
766 f: BinaryIO,
767 entries: list[SerializedIndexEntry],
768 version: Optional[int] = None,
769 extensions: Optional[list[IndexExtension]] = None,
770) -> None:
771 """Write an index file.
773 Args:
774 f: File-like object to write to
775 version: Version number to write
776 entries: Iterable over the entries to write
777 extensions: Optional list of extensions to write
778 """
779 if version is None:
780 version = DEFAULT_VERSION
781 # STEP 1: check if any extended_flags are set
782 uses_extended_flags = any(e.extended_flags != 0 for e in entries)
783 if uses_extended_flags and version < 3:
784 # Force or bump the version to 3
785 version = 3
786 # The rest is unchanged, but you might insert a final check:
787 if version < 3:
788 # Double-check no extended flags appear
789 for e in entries:
790 if e.extended_flags != 0:
791 raise AssertionError("Attempt to use extended flags in index < v3")
792 # Proceed with the existing code to write the header and entries.
793 f.write(b"DIRC")
794 f.write(struct.pack(b">LL", version, len(entries)))
795 previous_path = b""
796 for entry in entries:
797 write_cache_entry(f, entry, version=version, previous_path=previous_path)
798 previous_path = entry.name
800 # Write extensions
801 if extensions:
802 for extension in extensions:
803 write_index_extension(f, extension)
806def write_index_dict(
807 f: BinaryIO,
808 entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
809 version: Optional[int] = None,
810 extensions: Optional[list[IndexExtension]] = None,
811) -> None:
812 """Write an index file based on the contents of a dictionary.
813 being careful to sort by path and then by stage.
814 """
815 entries_list = []
816 for key in sorted(entries):
817 value = entries[key]
818 if isinstance(value, ConflictedIndexEntry):
819 if value.ancestor is not None:
820 entries_list.append(
821 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
822 )
823 if value.this is not None:
824 entries_list.append(
825 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
826 )
827 if value.other is not None:
828 entries_list.append(
829 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
830 )
831 else:
832 entries_list.append(value.serialize(key, Stage.NORMAL))
834 write_index(f, entries_list, version=version, extensions=extensions)
837def cleanup_mode(mode: int) -> int:
838 """Cleanup a mode value.
840 This will return a mode that can be stored in a tree object.
842 Args:
843 mode: Mode to clean up.
845 Returns:
846 mode
847 """
848 if stat.S_ISLNK(mode):
849 return stat.S_IFLNK
850 elif stat.S_ISDIR(mode):
851 return stat.S_IFDIR
852 elif S_ISGITLINK(mode):
853 return S_IFGITLINK
854 ret = stat.S_IFREG | 0o644
855 if mode & 0o100:
856 ret |= 0o111
857 return ret
860class Index:
861 """A Git Index file."""
863 _byname: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
865 def __init__(
866 self,
867 filename: Union[bytes, str, os.PathLike],
868 read: bool = True,
869 skip_hash: bool = False,
870 version: Optional[int] = None,
871 ) -> None:
872 """Create an index object associated with the given filename.
874 Args:
875 filename: Path to the index file
876 read: Whether to initialize the index from the given file, should it exist.
877 skip_hash: Whether to skip SHA1 hash when writing (for manyfiles feature)
878 version: Index format version to use (None = auto-detect from file or use default)
879 """
880 self._filename = os.fspath(filename)
881 # TODO(jelmer): Store the version returned by read_index
882 self._version = version
883 self._skip_hash = skip_hash
884 self._extensions: list[IndexExtension] = []
885 self.clear()
886 if read:
887 self.read()
889 @property
890 def path(self) -> Union[bytes, str]:
891 return self._filename
893 def __repr__(self) -> str:
894 return f"{self.__class__.__name__}({self._filename!r})"
896 def write(self) -> None:
897 """Write current contents of index to disk."""
898 from typing import BinaryIO, cast
900 f = GitFile(self._filename, "wb")
901 try:
902 # Filter out extensions with no meaningful data
903 meaningful_extensions = []
904 for ext in self._extensions:
905 # Skip extensions that have empty data
906 ext_data = ext.to_bytes()
907 if ext_data:
908 meaningful_extensions.append(ext)
910 if self._skip_hash:
911 # When skipHash is enabled, write the index without computing SHA1
912 write_index_dict(
913 cast(BinaryIO, f),
914 self._byname,
915 version=self._version,
916 extensions=meaningful_extensions,
917 )
918 # Write 20 zero bytes instead of SHA1
919 f.write(b"\x00" * 20)
920 f.close()
921 else:
922 sha1_writer = SHA1Writer(cast(BinaryIO, f))
923 write_index_dict(
924 cast(BinaryIO, sha1_writer),
925 self._byname,
926 version=self._version,
927 extensions=meaningful_extensions,
928 )
929 sha1_writer.close()
930 except:
931 f.close()
932 raise
934 def read(self) -> None:
935 """Read current contents of index from disk."""
936 if not os.path.exists(self._filename):
937 return
938 f = GitFile(self._filename, "rb")
939 try:
940 sha1_reader = SHA1Reader(f)
941 entries, version, extensions = read_index_dict_with_version(
942 cast(BinaryIO, sha1_reader)
943 )
944 self._version = version
945 self._extensions = extensions
946 self.update(entries)
947 # Extensions have already been read by read_index_dict_with_version
948 sha1_reader.check_sha(allow_empty=True)
949 finally:
950 f.close()
952 def __len__(self) -> int:
953 """Number of entries in this index file."""
954 return len(self._byname)
956 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
957 """Retrieve entry by relative path and stage.
959 Returns: Either a IndexEntry or a ConflictedIndexEntry
960 Raises KeyError: if the entry does not exist
961 """
962 return self._byname[key]
964 def __iter__(self) -> Iterator[bytes]:
965 """Iterate over the paths and stages in this index."""
966 return iter(self._byname)
968 def __contains__(self, key: bytes) -> bool:
969 return key in self._byname
971 def get_sha1(self, path: bytes) -> bytes:
972 """Return the (git object) SHA1 for the object at a path."""
973 value = self[path]
974 if isinstance(value, ConflictedIndexEntry):
975 raise UnmergedEntries
976 return value.sha
978 def get_mode(self, path: bytes) -> int:
979 """Return the POSIX file mode for the object at a path."""
980 value = self[path]
981 if isinstance(value, ConflictedIndexEntry):
982 raise UnmergedEntries
983 return value.mode
985 def iterobjects(self) -> Iterable[tuple[bytes, bytes, int]]:
986 """Iterate over path, sha, mode tuples for use with commit_tree."""
987 for path in self:
988 entry = self[path]
989 if isinstance(entry, ConflictedIndexEntry):
990 raise UnmergedEntries
991 yield path, entry.sha, cleanup_mode(entry.mode)
993 def has_conflicts(self) -> bool:
994 for value in self._byname.values():
995 if isinstance(value, ConflictedIndexEntry):
996 return True
997 return False
999 def clear(self) -> None:
1000 """Remove all contents from this index."""
1001 self._byname = {}
1003 def __setitem__(
1004 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
1005 ) -> None:
1006 assert isinstance(name, bytes)
1007 self._byname[name] = value
1009 def __delitem__(self, name: bytes) -> None:
1010 del self._byname[name]
1012 def iteritems(
1013 self,
1014 ) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1015 return iter(self._byname.items())
1017 def items(self) -> Iterator[tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
1018 return iter(self._byname.items())
1020 def update(
1021 self, entries: dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
1022 ) -> None:
1023 for key, value in entries.items():
1024 self[key] = value
1026 def paths(self) -> Generator[bytes, None, None]:
1027 yield from self._byname.keys()
1029 def changes_from_tree(
1030 self,
1031 object_store: ObjectContainer,
1032 tree: ObjectID,
1033 want_unchanged: bool = False,
1034 ) -> Generator[
1035 tuple[
1036 tuple[Optional[bytes], Optional[bytes]],
1037 tuple[Optional[int], Optional[int]],
1038 tuple[Optional[bytes], Optional[bytes]],
1039 ],
1040 None,
1041 None,
1042 ]:
1043 """Find the differences between the contents of this index and a tree.
1045 Args:
1046 object_store: Object store to use for retrieving tree contents
1047 tree: SHA1 of the root tree
1048 want_unchanged: Whether unchanged files should be reported
1049 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
1050 newmode), (oldsha, newsha)
1051 """
1053 def lookup_entry(path: bytes) -> tuple[bytes, int]:
1054 entry = self[path]
1055 if hasattr(entry, "sha") and hasattr(entry, "mode"):
1056 return entry.sha, cleanup_mode(entry.mode)
1057 else:
1058 # Handle ConflictedIndexEntry case
1059 return b"", 0
1061 yield from changes_from_tree(
1062 self.paths(),
1063 lookup_entry,
1064 object_store,
1065 tree,
1066 want_unchanged=want_unchanged,
1067 )
1069 def commit(self, object_store: ObjectContainer) -> bytes:
1070 """Create a new tree from an index.
1072 Args:
1073 object_store: Object store to save the tree in
1074 Returns:
1075 Root tree SHA
1076 """
1077 return commit_tree(object_store, self.iterobjects())
1080def commit_tree(
1081 object_store: ObjectContainer, blobs: Iterable[tuple[bytes, bytes, int]]
1082) -> bytes:
1083 """Commit a new tree.
1085 Args:
1086 object_store: Object store to add trees to
1087 blobs: Iterable over blob path, sha, mode entries
1088 Returns:
1089 SHA1 of the created tree.
1090 """
1091 trees: dict[bytes, Any] = {b"": {}}
1093 def add_tree(path: bytes) -> dict[bytes, Any]:
1094 if path in trees:
1095 return trees[path]
1096 dirname, basename = pathsplit(path)
1097 t = add_tree(dirname)
1098 assert isinstance(basename, bytes)
1099 newtree: dict[bytes, Any] = {}
1100 t[basename] = newtree
1101 trees[path] = newtree
1102 return newtree
1104 for path, sha, mode in blobs:
1105 tree_path, basename = pathsplit(path)
1106 tree = add_tree(tree_path)
1107 tree[basename] = (mode, sha)
1109 def build_tree(path: bytes) -> bytes:
1110 tree = Tree()
1111 for basename, entry in trees[path].items():
1112 if isinstance(entry, dict):
1113 mode = stat.S_IFDIR
1114 sha = build_tree(pathjoin(path, basename))
1115 else:
1116 (mode, sha) = entry
1117 tree.add(basename, mode, sha)
1118 object_store.add_object(tree)
1119 return tree.id
1121 return build_tree(b"")
1124def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
1125 """Create a new tree from an index.
1127 Args:
1128 object_store: Object store to save the tree in
1129 index: Index file
1130 Note: This function is deprecated, use index.commit() instead.
1131 Returns: Root tree sha.
1132 """
1133 return commit_tree(object_store, index.iterobjects())
1136def changes_from_tree(
1137 names: Iterable[bytes],
1138 lookup_entry: Callable[[bytes], tuple[bytes, int]],
1139 object_store: ObjectContainer,
1140 tree: Optional[bytes],
1141 want_unchanged: bool = False,
1142) -> Iterable[
1143 tuple[
1144 tuple[Optional[bytes], Optional[bytes]],
1145 tuple[Optional[int], Optional[int]],
1146 tuple[Optional[bytes], Optional[bytes]],
1147 ]
1148]:
1149 """Find the differences between the contents of a tree and
1150 a working copy.
1152 Args:
1153 names: Iterable of names in the working copy
1154 lookup_entry: Function to lookup an entry in the working copy
1155 object_store: Object store to use for retrieving tree contents
1156 tree: SHA1 of the root tree, or None for an empty tree
1157 want_unchanged: Whether unchanged files should be reported
1158 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
1159 (oldsha, newsha)
1160 """
1161 # TODO(jelmer): Support a include_trees option
1162 other_names = set(names)
1164 if tree is not None:
1165 for name, mode, sha in iter_tree_contents(object_store, tree):
1166 try:
1167 (other_sha, other_mode) = lookup_entry(name)
1168 except KeyError:
1169 # Was removed
1170 yield ((name, None), (mode, None), (sha, None))
1171 else:
1172 other_names.remove(name)
1173 if want_unchanged or other_sha != sha or other_mode != mode:
1174 yield ((name, name), (mode, other_mode), (sha, other_sha))
1176 # Mention added files
1177 for name in other_names:
1178 try:
1179 (other_sha, other_mode) = lookup_entry(name)
1180 except KeyError:
1181 pass
1182 else:
1183 yield ((None, name), (None, other_mode), (None, other_sha))
1186def index_entry_from_stat(
1187 stat_val: os.stat_result,
1188 hex_sha: bytes,
1189 mode: Optional[int] = None,
1190) -> IndexEntry:
1191 """Create a new index entry from a stat value.
1193 Args:
1194 stat_val: POSIX stat_result instance
1195 hex_sha: Hex sha of the object
1196 """
1197 if mode is None:
1198 mode = cleanup_mode(stat_val.st_mode)
1200 return IndexEntry(
1201 ctime=stat_val.st_ctime,
1202 mtime=stat_val.st_mtime,
1203 dev=stat_val.st_dev,
1204 ino=stat_val.st_ino,
1205 mode=mode,
1206 uid=stat_val.st_uid,
1207 gid=stat_val.st_gid,
1208 size=stat_val.st_size,
1209 sha=hex_sha,
1210 flags=0,
1211 extended_flags=0,
1212 )
1215if sys.platform == "win32":
1216 # On Windows, creating symlinks either requires administrator privileges
1217 # or developer mode. Raise a more helpful error when we're unable to
1218 # create symlinks
1220 # https://github.com/jelmer/dulwich/issues/1005
1222 class WindowsSymlinkPermissionError(PermissionError):
1223 def __init__(self, errno: int, msg: str, filename: Optional[str]) -> None:
1224 super(PermissionError, self).__init__(
1225 errno,
1226 f"Unable to create symlink; do you have developer mode enabled? {msg}",
1227 filename,
1228 )
1230 def symlink(
1231 src: Union[str, bytes],
1232 dst: Union[str, bytes],
1233 target_is_directory: bool = False,
1234 *,
1235 dir_fd: Optional[int] = None,
1236 ) -> None:
1237 try:
1238 return os.symlink(
1239 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
1240 )
1241 except PermissionError as e:
1242 raise WindowsSymlinkPermissionError(
1243 e.errno or 0, e.strerror or "", e.filename
1244 ) from e
1245else:
1246 symlink = os.symlink
1249def build_file_from_blob(
1250 blob: Blob,
1251 mode: int,
1252 target_path: bytes,
1253 *,
1254 honor_filemode: bool = True,
1255 tree_encoding: str = "utf-8",
1256 symlink_fn: Optional[Callable] = None,
1257) -> os.stat_result:
1258 """Build a file or symlink on disk based on a Git object.
1260 Args:
1261 blob: The git object
1262 mode: File mode
1263 target_path: Path to write to
1264 honor_filemode: An optional flag to honor core.filemode setting in
1265 config file, default is core.filemode=True, change executable bit
1266 symlink_fn: Function to use for creating symlinks
1267 Returns: stat object for the file
1268 """
1269 try:
1270 oldstat = os.lstat(target_path)
1271 except FileNotFoundError:
1272 oldstat = None
1273 contents = blob.as_raw_string()
1274 if stat.S_ISLNK(mode):
1275 if oldstat:
1276 _remove_file_with_readonly_handling(target_path)
1277 if sys.platform == "win32":
1278 # os.readlink on Python3 on Windows requires a unicode string.
1279 contents_str = contents.decode(tree_encoding)
1280 target_path_str = target_path.decode(tree_encoding)
1281 (symlink_fn or symlink)(contents_str, target_path_str)
1282 else:
1283 (symlink_fn or symlink)(contents, target_path)
1284 else:
1285 if oldstat is not None and oldstat.st_size == len(contents):
1286 with open(target_path, "rb") as f:
1287 if f.read() == contents:
1288 return oldstat
1290 with open(target_path, "wb") as f:
1291 # Write out file
1292 f.write(contents)
1294 if honor_filemode:
1295 os.chmod(target_path, mode)
1297 return os.lstat(target_path)
1300INVALID_DOTNAMES = (b".git", b".", b"..", b"")
1303def _normalize_path_element_default(element: bytes) -> bytes:
1304 """Normalize path element for default case-insensitive comparison."""
1305 return element.lower()
1308def _normalize_path_element_ntfs(element: bytes) -> bytes:
1309 """Normalize path element for NTFS filesystem."""
1310 return element.rstrip(b". ").lower()
1313def _normalize_path_element_hfs(element: bytes) -> bytes:
1314 """Normalize path element for HFS+ filesystem."""
1315 import unicodedata
1317 # Decode to Unicode (let UnicodeDecodeError bubble up)
1318 element_str = element.decode("utf-8", errors="strict")
1320 # Remove HFS+ ignorable characters
1321 filtered = "".join(c for c in element_str if ord(c) not in HFS_IGNORABLE_CHARS)
1322 # Normalize to NFD
1323 normalized = unicodedata.normalize("NFD", filtered)
1324 return normalized.lower().encode("utf-8", errors="strict")
1327def get_path_element_normalizer(config) -> Callable[[bytes], bytes]:
1328 """Get the appropriate path element normalization function based on config.
1330 Args:
1331 config: Repository configuration object
1333 Returns:
1334 Function that normalizes path elements for the configured filesystem
1335 """
1336 import os
1337 import sys
1339 if config.get_boolean(b"core", b"protectNTFS", os.name == "nt"):
1340 return _normalize_path_element_ntfs
1341 elif config.get_boolean(b"core", b"protectHFS", sys.platform == "darwin"):
1342 return _normalize_path_element_hfs
1343 else:
1344 return _normalize_path_element_default
1347def validate_path_element_default(element: bytes) -> bool:
1348 return _normalize_path_element_default(element) not in INVALID_DOTNAMES
1351def validate_path_element_ntfs(element: bytes) -> bool:
1352 normalized = _normalize_path_element_ntfs(element)
1353 if normalized in INVALID_DOTNAMES:
1354 return False
1355 if normalized == b"git~1":
1356 return False
1357 return True
1360# HFS+ ignorable Unicode codepoints (from Git's utf8.c)
1361HFS_IGNORABLE_CHARS = {
1362 0x200C, # ZERO WIDTH NON-JOINER
1363 0x200D, # ZERO WIDTH JOINER
1364 0x200E, # LEFT-TO-RIGHT MARK
1365 0x200F, # RIGHT-TO-LEFT MARK
1366 0x202A, # LEFT-TO-RIGHT EMBEDDING
1367 0x202B, # RIGHT-TO-LEFT EMBEDDING
1368 0x202C, # POP DIRECTIONAL FORMATTING
1369 0x202D, # LEFT-TO-RIGHT OVERRIDE
1370 0x202E, # RIGHT-TO-LEFT OVERRIDE
1371 0x206A, # INHIBIT SYMMETRIC SWAPPING
1372 0x206B, # ACTIVATE SYMMETRIC SWAPPING
1373 0x206C, # INHIBIT ARABIC FORM SHAPING
1374 0x206D, # ACTIVATE ARABIC FORM SHAPING
1375 0x206E, # NATIONAL DIGIT SHAPES
1376 0x206F, # NOMINAL DIGIT SHAPES
1377 0xFEFF, # ZERO WIDTH NO-BREAK SPACE
1378}
1381def validate_path_element_hfs(element: bytes) -> bool:
1382 """Validate path element for HFS+ filesystem.
1384 Equivalent to Git's is_hfs_dotgit and related checks.
1385 Uses NFD normalization and ignores HFS+ ignorable characters.
1386 """
1387 try:
1388 normalized = _normalize_path_element_hfs(element)
1389 except UnicodeDecodeError:
1390 # Malformed UTF-8 - be conservative and reject
1391 return False
1393 # Check against invalid names
1394 if normalized in INVALID_DOTNAMES:
1395 return False
1397 # Also check for 8.3 short name
1398 if normalized == b"git~1":
1399 return False
1401 return True
1404def validate_path(
1405 path: bytes,
1406 element_validator: Callable[[bytes], bool] = validate_path_element_default,
1407) -> bool:
1408 """Default path validator that just checks for .git/."""
1409 parts = path.split(b"/")
1410 for p in parts:
1411 if not element_validator(p):
1412 return False
1413 else:
1414 return True
1417def build_index_from_tree(
1418 root_path: Union[str, bytes],
1419 index_path: Union[str, bytes],
1420 object_store: ObjectContainer,
1421 tree_id: bytes,
1422 honor_filemode: bool = True,
1423 validate_path_element: Callable[[bytes], bool] = validate_path_element_default,
1424 symlink_fn: Optional[Callable] = None,
1425 blob_normalizer: Optional["BlobNormalizer"] = None,
1426 tree_encoding: str = "utf-8",
1427) -> None:
1428 """Generate and materialize index from a tree.
1430 Args:
1431 tree_id: Tree to materialize
1432 root_path: Target dir for materialized index files
1433 index_path: Target path for generated index
1434 object_store: Non-empty object store holding tree contents
1435 honor_filemode: An optional flag to honor core.filemode setting in
1436 config file, default is core.filemode=True, change executable bit
1437 validate_path_element: Function to validate path elements to check
1438 out; default just refuses .git and .. directories.
1439 blob_normalizer: An optional BlobNormalizer to use for converting line
1440 endings when writing blobs to the working directory.
1441 tree_encoding: Encoding used for tree paths (default: utf-8)
1443 Note: existing index is wiped and contents are not merged
1444 in a working dir. Suitable only for fresh clones.
1445 """
1446 index = Index(index_path, read=False)
1447 if not isinstance(root_path, bytes):
1448 root_path = os.fsencode(root_path)
1450 for entry in iter_tree_contents(object_store, tree_id):
1451 if not validate_path(entry.path, validate_path_element):
1452 continue
1453 full_path = _tree_to_fs_path(root_path, entry.path, tree_encoding)
1455 if not os.path.exists(os.path.dirname(full_path)):
1456 os.makedirs(os.path.dirname(full_path))
1458 # TODO(jelmer): Merge new index into working tree
1459 if S_ISGITLINK(entry.mode):
1460 if not os.path.isdir(full_path):
1461 os.mkdir(full_path)
1462 st = os.lstat(full_path)
1463 # TODO(jelmer): record and return submodule paths
1464 else:
1465 obj = object_store[entry.sha]
1466 assert isinstance(obj, Blob)
1467 # Apply blob normalization for checkout if normalizer is provided
1468 if blob_normalizer is not None:
1469 obj = blob_normalizer.checkout_normalize(obj, entry.path)
1470 st = build_file_from_blob(
1471 obj,
1472 entry.mode,
1473 full_path,
1474 honor_filemode=honor_filemode,
1475 tree_encoding=tree_encoding,
1476 symlink_fn=symlink_fn,
1477 )
1479 # Add file to index
1480 if not honor_filemode or S_ISGITLINK(entry.mode):
1481 # we can not use tuple slicing to build a new tuple,
1482 # because on windows that will convert the times to
1483 # longs, which causes errors further along
1484 st_tuple = (
1485 entry.mode,
1486 st.st_ino,
1487 st.st_dev,
1488 st.st_nlink,
1489 st.st_uid,
1490 st.st_gid,
1491 st.st_size,
1492 st.st_atime,
1493 st.st_mtime,
1494 st.st_ctime,
1495 )
1496 st = st.__class__(st_tuple)
1497 # default to a stage 0 index entry (normal)
1498 # when reading from the filesystem
1499 index[entry.path] = index_entry_from_stat(st, entry.sha)
1501 index.write()
1504def blob_from_path_and_mode(
1505 fs_path: bytes, mode: int, tree_encoding: str = "utf-8"
1506) -> Blob:
1507 """Create a blob from a path and a stat object.
1509 Args:
1510 fs_path: Full file system path to file
1511 mode: File mode
1512 Returns: A `Blob` object
1513 """
1514 assert isinstance(fs_path, bytes)
1515 blob = Blob()
1516 if stat.S_ISLNK(mode):
1517 if sys.platform == "win32":
1518 # os.readlink on Python3 on Windows requires a unicode string.
1519 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
1520 else:
1521 blob.data = os.readlink(fs_path)
1522 else:
1523 with open(fs_path, "rb") as f:
1524 blob.data = f.read()
1525 return blob
1528def blob_from_path_and_stat(
1529 fs_path: bytes, st: os.stat_result, tree_encoding: str = "utf-8"
1530) -> Blob:
1531 """Create a blob from a path and a stat object.
1533 Args:
1534 fs_path: Full file system path to file
1535 st: A stat object
1536 Returns: A `Blob` object
1537 """
1538 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
1541def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
1542 """Read the head commit of a submodule.
1544 Args:
1545 path: path to the submodule
1546 Returns: HEAD sha, None if not a valid head/repository
1547 """
1548 from .errors import NotGitRepository
1549 from .repo import Repo
1551 # Repo currently expects a "str", so decode if necessary.
1552 # TODO(jelmer): Perhaps move this into Repo() ?
1553 if not isinstance(path, str):
1554 path = os.fsdecode(path)
1555 try:
1556 repo = Repo(path)
1557 except NotGitRepository:
1558 return None
1559 try:
1560 return repo.head()
1561 except KeyError:
1562 return None
1565def _has_directory_changed(tree_path: bytes, entry: IndexEntry) -> bool:
1566 """Check if a directory has changed after getting an error.
1568 When handling an error trying to create a blob from a path, call this
1569 function. It will check if the path is a directory. If it's a directory
1570 and a submodule, check the submodule head to see if it's has changed. If
1571 not, consider the file as changed as Git tracked a file and not a
1572 directory.
1574 Return true if the given path should be considered as changed and False
1575 otherwise or if the path is not a directory.
1576 """
1577 # This is actually a directory
1578 if os.path.exists(os.path.join(tree_path, b".git")):
1579 # Submodule
1580 head = read_submodule_head(tree_path)
1581 if entry.sha != head:
1582 return True
1583 else:
1584 # The file was changed to a directory, so consider it removed.
1585 return True
1587 return False
1590os_sep_bytes = os.sep.encode("ascii")
1593def _ensure_parent_dir_exists(full_path: bytes) -> None:
1594 """Ensure parent directory exists, checking no parent is a file."""
1595 parent_dir = os.path.dirname(full_path)
1596 if parent_dir and not os.path.exists(parent_dir):
1597 # Check if any parent in the path is a file
1598 parts = parent_dir.split(os_sep_bytes)
1599 for i in range(len(parts)):
1600 partial_path = os_sep_bytes.join(parts[: i + 1])
1601 if (
1602 partial_path
1603 and os.path.exists(partial_path)
1604 and not os.path.isdir(partial_path)
1605 ):
1606 # Parent path is a file, this is an error
1607 raise OSError(
1608 f"Cannot create directory, parent path is a file: {partial_path!r}"
1609 )
1610 os.makedirs(parent_dir)
1613def _remove_file_with_readonly_handling(path: bytes) -> None:
1614 """Remove a file, handling read-only files on Windows.
1616 Args:
1617 path: Path to the file to remove
1618 """
1619 try:
1620 os.unlink(path)
1621 except PermissionError:
1622 # On Windows, remove read-only attribute and retry
1623 if sys.platform == "win32":
1624 os.chmod(path, stat.S_IWRITE | stat.S_IREAD)
1625 os.unlink(path)
1626 else:
1627 raise
1630def _remove_empty_parents(path: bytes, stop_at: bytes) -> None:
1631 """Remove empty parent directories up to stop_at."""
1632 parent = os.path.dirname(path)
1633 while parent and parent != stop_at:
1634 try:
1635 os.rmdir(parent)
1636 parent = os.path.dirname(parent)
1637 except FileNotFoundError:
1638 # Directory doesn't exist - stop trying
1639 break
1640 except OSError as e:
1641 if e.errno == errno.ENOTEMPTY:
1642 # Directory not empty - stop trying
1643 break
1644 raise
1647def _check_symlink_matches(
1648 full_path: bytes, repo_object_store, entry_sha: bytes
1649) -> bool:
1650 """Check if symlink target matches expected target.
1652 Returns True if symlink matches, False if it doesn't match.
1653 """
1654 try:
1655 current_target = os.readlink(full_path)
1656 blob_obj = repo_object_store[entry_sha]
1657 expected_target = blob_obj.as_raw_string()
1658 if isinstance(current_target, str):
1659 current_target = current_target.encode()
1660 return current_target == expected_target
1661 except FileNotFoundError:
1662 # Symlink doesn't exist
1663 return False
1664 except OSError as e:
1665 if e.errno == errno.EINVAL:
1666 # Not a symlink
1667 return False
1668 raise
1671def _check_file_matches(
1672 repo_object_store,
1673 full_path: bytes,
1674 entry_sha: bytes,
1675 entry_mode: int,
1676 current_stat: os.stat_result,
1677 honor_filemode: bool,
1678 blob_normalizer: Optional["BlobNormalizer"] = None,
1679 tree_path: Optional[bytes] = None,
1680) -> bool:
1681 """Check if a file on disk matches the expected git object.
1683 Returns True if file matches, False if it doesn't match.
1684 """
1685 # Check mode first (if honor_filemode is True)
1686 if honor_filemode:
1687 current_mode = stat.S_IMODE(current_stat.st_mode)
1688 expected_mode = stat.S_IMODE(entry_mode)
1690 # For regular files, only check the user executable bit, not group/other permissions
1691 # This matches Git's behavior where umask differences don't count as modifications
1692 if stat.S_ISREG(current_stat.st_mode):
1693 # Normalize regular file modes to ignore group/other write permissions
1694 current_mode_normalized = (
1695 current_mode & 0o755
1696 ) # Keep only user rwx and all read+execute
1697 expected_mode_normalized = expected_mode & 0o755
1699 # For Git compatibility, regular files should be either 644 or 755
1700 if expected_mode_normalized not in (0o644, 0o755):
1701 expected_mode_normalized = 0o644 # Default for regular files
1702 if current_mode_normalized not in (0o644, 0o755):
1703 # Determine if it should be executable based on user execute bit
1704 if current_mode & 0o100: # User execute bit is set
1705 current_mode_normalized = 0o755
1706 else:
1707 current_mode_normalized = 0o644
1709 if current_mode_normalized != expected_mode_normalized:
1710 return False
1711 else:
1712 # For non-regular files (symlinks, etc.), check mode exactly
1713 if current_mode != expected_mode:
1714 return False
1716 # If mode matches (or we don't care), check content via size first
1717 blob_obj = repo_object_store[entry_sha]
1718 if current_stat.st_size != blob_obj.raw_length():
1719 return False
1721 # Size matches, check actual content
1722 try:
1723 with open(full_path, "rb") as f:
1724 current_content = f.read()
1725 expected_content = blob_obj.as_raw_string()
1726 if blob_normalizer and tree_path is not None:
1727 normalized_blob = blob_normalizer.checkout_normalize(
1728 blob_obj, tree_path
1729 )
1730 expected_content = normalized_blob.as_raw_string()
1731 return current_content == expected_content
1732 except (FileNotFoundError, PermissionError, IsADirectoryError):
1733 return False
1736def _transition_to_submodule(repo, path, full_path, current_stat, entry, index):
1737 """Transition any type to submodule."""
1738 from .submodule import ensure_submodule_placeholder
1740 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1741 # Already a directory, just ensure .git file exists
1742 ensure_submodule_placeholder(repo, path)
1743 else:
1744 # Remove whatever is there and create submodule
1745 if current_stat is not None:
1746 _remove_file_with_readonly_handling(full_path)
1747 ensure_submodule_placeholder(repo, path)
1749 st = os.lstat(full_path)
1750 index[path] = index_entry_from_stat(st, entry.sha)
1753def _transition_to_file(
1754 object_store,
1755 path,
1756 full_path,
1757 current_stat,
1758 entry,
1759 index,
1760 honor_filemode,
1761 symlink_fn,
1762 blob_normalizer,
1763 tree_encoding="utf-8",
1764):
1765 """Transition any type to regular file or symlink."""
1766 # Check if we need to update
1767 if (
1768 current_stat is not None
1769 and stat.S_ISREG(current_stat.st_mode)
1770 and not stat.S_ISLNK(entry.mode)
1771 ):
1772 # File to file - check if update needed
1773 file_matches = _check_file_matches(
1774 object_store,
1775 full_path,
1776 entry.sha,
1777 entry.mode,
1778 current_stat,
1779 honor_filemode,
1780 blob_normalizer,
1781 path,
1782 )
1783 needs_update = not file_matches
1784 elif (
1785 current_stat is not None
1786 and stat.S_ISLNK(current_stat.st_mode)
1787 and stat.S_ISLNK(entry.mode)
1788 ):
1789 # Symlink to symlink - check if update needed
1790 symlink_matches = _check_symlink_matches(full_path, object_store, entry.sha)
1791 needs_update = not symlink_matches
1792 else:
1793 needs_update = True
1795 if not needs_update:
1796 # Just update index - current_stat should always be valid here since we're not updating
1797 index[path] = index_entry_from_stat(current_stat, entry.sha)
1798 return
1800 # Remove existing entry if needed
1801 if current_stat is not None and stat.S_ISDIR(current_stat.st_mode):
1802 # Remove directory
1803 dir_contents = set(os.listdir(full_path))
1804 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
1806 if git_file_name in dir_contents:
1807 if dir_contents != {git_file_name}:
1808 raise IsADirectoryError(
1809 f"Cannot replace submodule with untracked files: {full_path!r}"
1810 )
1811 shutil.rmtree(full_path)
1812 else:
1813 try:
1814 os.rmdir(full_path)
1815 except OSError as e:
1816 if e.errno == errno.ENOTEMPTY:
1817 raise IsADirectoryError(
1818 f"Cannot replace non-empty directory with file: {full_path!r}"
1819 )
1820 raise
1821 elif current_stat is not None:
1822 _remove_file_with_readonly_handling(full_path)
1824 # Ensure parent directory exists
1825 _ensure_parent_dir_exists(full_path)
1827 # Write the file
1828 blob_obj = object_store[entry.sha]
1829 assert isinstance(blob_obj, Blob)
1830 if blob_normalizer:
1831 blob_obj = blob_normalizer.checkout_normalize(blob_obj, path)
1832 st = build_file_from_blob(
1833 blob_obj,
1834 entry.mode,
1835 full_path,
1836 honor_filemode=honor_filemode,
1837 tree_encoding=tree_encoding,
1838 symlink_fn=symlink_fn,
1839 )
1840 index[path] = index_entry_from_stat(st, entry.sha)
1843def _transition_to_absent(repo, path, full_path, current_stat, index):
1844 """Remove any type of entry."""
1845 if current_stat is None:
1846 return
1848 if stat.S_ISDIR(current_stat.st_mode):
1849 # Check if it's a submodule directory
1850 dir_contents = set(os.listdir(full_path))
1851 git_file_name = b".git" if isinstance(full_path, bytes) else ".git"
1853 if git_file_name in dir_contents and dir_contents == {git_file_name}:
1854 shutil.rmtree(full_path)
1855 else:
1856 try:
1857 os.rmdir(full_path)
1858 except OSError as e:
1859 if e.errno not in (errno.ENOTEMPTY, errno.EEXIST):
1860 raise
1861 else:
1862 _remove_file_with_readonly_handling(full_path)
1864 try:
1865 del index[path]
1866 except KeyError:
1867 pass
1869 # Try to remove empty parent directories
1870 _remove_empty_parents(
1871 full_path, repo.path if isinstance(repo.path, bytes) else repo.path.encode()
1872 )
1875def detect_case_only_renames(
1876 changes: list["TreeChange"],
1877 config: "Config",
1878) -> list["TreeChange"]:
1879 """Detect and transform case-only renames in a list of tree changes.
1881 This function identifies file renames that only differ in case (e.g.,
1882 README.txt -> readme.txt) and transforms matching ADD/DELETE pairs into
1883 CHANGE_RENAME operations. It uses filesystem-appropriate path normalization
1884 based on the repository configuration.
1886 Args:
1887 changes: List of TreeChange objects representing file changes
1888 config: Repository configuration object
1890 Returns:
1891 New list of TreeChange objects with case-only renames converted to CHANGE_RENAME
1892 """
1893 from .diff_tree import (
1894 CHANGE_ADD,
1895 CHANGE_COPY,
1896 CHANGE_DELETE,
1897 CHANGE_MODIFY,
1898 CHANGE_RENAME,
1899 TreeChange,
1900 )
1902 # Build dictionaries of old and new paths with their normalized forms
1903 old_paths_normalized = {}
1904 new_paths_normalized = {}
1905 old_changes = {} # Map from old path to change object
1906 new_changes = {} # Map from new path to change object
1908 # Get the appropriate normalizer based on config
1909 normalize_func = get_path_element_normalizer(config)
1911 def normalize_path(path: bytes) -> bytes:
1912 """Normalize entire path using element normalization."""
1913 return b"/".join(normalize_func(part) for part in path.split(b"/"))
1915 # Pre-normalize all paths once to avoid repeated normalization
1916 for change in changes:
1917 if change.type == CHANGE_DELETE and change.old:
1918 try:
1919 normalized = normalize_path(change.old.path)
1920 except UnicodeDecodeError:
1921 import logging
1923 logging.warning(
1924 "Skipping case-only rename detection for path with invalid UTF-8: %r",
1925 change.old.path,
1926 )
1927 else:
1928 old_paths_normalized[normalized] = change.old.path
1929 old_changes[change.old.path] = change
1930 elif change.type == CHANGE_RENAME and change.old:
1931 # Treat RENAME as DELETE + ADD for case-only detection
1932 try:
1933 normalized = normalize_path(change.old.path)
1934 except UnicodeDecodeError:
1935 import logging
1937 logging.warning(
1938 "Skipping case-only rename detection for path with invalid UTF-8: %r",
1939 change.old.path,
1940 )
1941 else:
1942 old_paths_normalized[normalized] = change.old.path
1943 old_changes[change.old.path] = change
1945 if (
1946 change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY)
1947 and change.new
1948 ):
1949 try:
1950 normalized = normalize_path(change.new.path)
1951 except UnicodeDecodeError:
1952 import logging
1954 logging.warning(
1955 "Skipping case-only rename detection for path with invalid UTF-8: %r",
1956 change.new.path,
1957 )
1958 else:
1959 new_paths_normalized[normalized] = change.new.path
1960 new_changes[change.new.path] = change
1962 # Find case-only renames and transform changes
1963 case_only_renames = set()
1964 new_rename_changes = []
1966 for norm_path, old_path in old_paths_normalized.items():
1967 if norm_path in new_paths_normalized:
1968 new_path = new_paths_normalized[norm_path]
1969 if old_path != new_path:
1970 # Found a case-only rename
1971 old_change = old_changes[old_path]
1972 new_change = new_changes[new_path]
1974 # Create a CHANGE_RENAME to replace the DELETE and ADD/MODIFY pair
1975 if new_change.type == CHANGE_ADD:
1976 # Simple case: DELETE + ADD becomes RENAME
1977 rename_change = TreeChange(
1978 CHANGE_RENAME, old_change.old, new_change.new
1979 )
1980 else:
1981 # Complex case: DELETE + MODIFY becomes RENAME
1982 # Use the old file from DELETE and new file from MODIFY
1983 rename_change = TreeChange(
1984 CHANGE_RENAME, old_change.old, new_change.new
1985 )
1987 new_rename_changes.append(rename_change)
1989 # Mark the old changes for removal
1990 case_only_renames.add(old_change)
1991 case_only_renames.add(new_change)
1993 # Return new list with original ADD/DELETE changes replaced by renames
1994 result = [change for change in changes if change not in case_only_renames]
1995 result.extend(new_rename_changes)
1996 return result
1999def update_working_tree(
2000 repo: "Repo",
2001 old_tree_id: Optional[bytes],
2002 new_tree_id: bytes,
2003 change_iterator: Iterator["TreeChange"],
2004 honor_filemode: bool = True,
2005 validate_path_element: Optional[Callable[[bytes], bool]] = None,
2006 symlink_fn: Optional[Callable] = None,
2007 force_remove_untracked: bool = False,
2008 blob_normalizer: Optional["BlobNormalizer"] = None,
2009 tree_encoding: str = "utf-8",
2010 allow_overwrite_modified: bool = False,
2011) -> None:
2012 """Update the working tree and index to match a new tree.
2014 This function handles:
2015 - Adding new files
2016 - Updating modified files
2017 - Removing deleted files
2018 - Cleaning up empty directories
2020 Args:
2021 repo: Repository object
2022 old_tree_id: SHA of the tree before the update
2023 new_tree_id: SHA of the tree to update to
2024 change_iterator: Iterator of TreeChange objects to apply
2025 honor_filemode: An optional flag to honor core.filemode setting
2026 validate_path_element: Function to validate path elements to check out
2027 symlink_fn: Function to use for creating symlinks
2028 force_remove_untracked: If True, remove files that exist in working
2029 directory but not in target tree, even if old_tree_id is None
2030 blob_normalizer: An optional BlobNormalizer to use for converting line
2031 endings when writing blobs to the working directory.
2032 tree_encoding: Encoding used for tree paths (default: utf-8)
2033 allow_overwrite_modified: If False, raise an error when attempting to
2034 overwrite files that have been modified compared to old_tree_id
2035 """
2036 if validate_path_element is None:
2037 validate_path_element = validate_path_element_default
2039 from .diff_tree import (
2040 CHANGE_ADD,
2041 CHANGE_COPY,
2042 CHANGE_DELETE,
2043 CHANGE_MODIFY,
2044 CHANGE_RENAME,
2045 CHANGE_UNCHANGED,
2046 )
2048 repo_path = repo.path if isinstance(repo.path, bytes) else repo.path.encode()
2049 index = repo.open_index()
2051 # Convert iterator to list since we need multiple passes
2052 changes = list(change_iterator)
2054 # Transform case-only renames on case-insensitive filesystems
2055 import platform
2057 default_ignore_case = platform.system() in ("Windows", "Darwin")
2058 config = repo.get_config()
2059 ignore_case = config.get_boolean((b"core",), b"ignorecase", default_ignore_case)
2061 if ignore_case:
2062 config = repo.get_config()
2063 changes = detect_case_only_renames(changes, config)
2065 # Check for path conflicts where files need to become directories
2066 paths_becoming_dirs = set()
2067 for change in changes:
2068 if change.type in (CHANGE_ADD, CHANGE_MODIFY, CHANGE_RENAME, CHANGE_COPY):
2069 path = change.new.path
2070 if b"/" in path: # This is a file inside a directory
2071 # Check if any parent path exists as a file in the old tree or changes
2072 parts = path.split(b"/")
2073 for i in range(1, len(parts)):
2074 parent = b"/".join(parts[:i])
2075 # See if this parent path is being deleted (was a file, becoming a dir)
2076 for other_change in changes:
2077 if (
2078 other_change.type == CHANGE_DELETE
2079 and other_change.old
2080 and other_change.old.path == parent
2081 ):
2082 paths_becoming_dirs.add(parent)
2084 # Check if any path that needs to become a directory has been modified
2085 for path in paths_becoming_dirs:
2086 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2087 try:
2088 current_stat = os.lstat(full_path)
2089 except FileNotFoundError:
2090 continue # File doesn't exist, nothing to check
2091 except OSError as e:
2092 raise OSError(
2093 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2094 ) from e
2096 if stat.S_ISREG(current_stat.st_mode):
2097 # Find the old entry for this path
2098 old_change = None
2099 for change in changes:
2100 if (
2101 change.type == CHANGE_DELETE
2102 and change.old
2103 and change.old.path == path
2104 ):
2105 old_change = change
2106 break
2108 if old_change:
2109 # Check if file has been modified
2110 file_matches = _check_file_matches(
2111 repo.object_store,
2112 full_path,
2113 old_change.old.sha,
2114 old_change.old.mode,
2115 current_stat,
2116 honor_filemode,
2117 blob_normalizer,
2118 path,
2119 )
2120 if not file_matches:
2121 raise OSError(
2122 f"Cannot replace modified file with directory: {path!r}"
2123 )
2125 # Check for uncommitted modifications before making any changes
2126 if not allow_overwrite_modified and old_tree_id:
2127 for change in changes:
2128 # Only check files that are being modified or deleted
2129 if change.type in (CHANGE_MODIFY, CHANGE_DELETE) and change.old:
2130 path = change.old.path
2131 if path.startswith(b".git") or not validate_path(
2132 path, validate_path_element
2133 ):
2134 continue
2136 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2137 try:
2138 current_stat = os.lstat(full_path)
2139 except FileNotFoundError:
2140 continue # File doesn't exist, nothing to check
2141 except OSError as e:
2142 raise OSError(
2143 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2144 ) from e
2146 if stat.S_ISREG(current_stat.st_mode):
2147 # Check if working tree file differs from old tree
2148 file_matches = _check_file_matches(
2149 repo.object_store,
2150 full_path,
2151 change.old.sha,
2152 change.old.mode,
2153 current_stat,
2154 honor_filemode,
2155 blob_normalizer,
2156 path,
2157 )
2158 if not file_matches:
2159 from .errors import WorkingTreeModifiedError
2161 raise WorkingTreeModifiedError(
2162 f"Your local changes to '{path.decode('utf-8', errors='replace')}' "
2163 f"would be overwritten by checkout. "
2164 f"Please commit your changes or stash them before you switch branches."
2165 )
2167 # Apply the changes
2168 for change in changes:
2169 if change.type in (CHANGE_DELETE, CHANGE_RENAME):
2170 # Remove file/directory
2171 path = change.old.path
2172 if path.startswith(b".git") or not validate_path(
2173 path, validate_path_element
2174 ):
2175 continue
2177 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2178 try:
2179 delete_stat: Optional[os.stat_result] = os.lstat(full_path)
2180 except FileNotFoundError:
2181 delete_stat = None
2182 except OSError as e:
2183 raise OSError(
2184 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2185 ) from e
2187 _transition_to_absent(repo, path, full_path, delete_stat, index)
2189 if change.type in (
2190 CHANGE_ADD,
2191 CHANGE_MODIFY,
2192 CHANGE_UNCHANGED,
2193 CHANGE_COPY,
2194 CHANGE_RENAME,
2195 ):
2196 # Add or modify file
2197 path = change.new.path
2198 if path.startswith(b".git") or not validate_path(
2199 path, validate_path_element
2200 ):
2201 continue
2203 full_path = _tree_to_fs_path(repo_path, path, tree_encoding)
2204 try:
2205 modify_stat: Optional[os.stat_result] = os.lstat(full_path)
2206 except FileNotFoundError:
2207 modify_stat = None
2208 except OSError as e:
2209 raise OSError(
2210 f"Cannot access {path.decode('utf-8', errors='replace')}: {e}"
2211 ) from e
2213 if S_ISGITLINK(change.new.mode):
2214 _transition_to_submodule(
2215 repo, path, full_path, modify_stat, change.new, index
2216 )
2217 else:
2218 _transition_to_file(
2219 repo.object_store,
2220 path,
2221 full_path,
2222 modify_stat,
2223 change.new,
2224 index,
2225 honor_filemode,
2226 symlink_fn,
2227 blob_normalizer,
2228 tree_encoding,
2229 )
2231 index.write()
2234def get_unstaged_changes(
2235 index: Index,
2236 root_path: Union[str, bytes],
2237 filter_blob_callback: Optional[Callable] = None,
2238) -> Generator[bytes, None, None]:
2239 """Walk through an index and check for differences against working tree.
2241 Args:
2242 index: index to check
2243 root_path: path in which to find files
2244 Returns: iterator over paths with unstaged changes
2245 """
2246 # For each entry in the index check the sha1 & ensure not staged
2247 if not isinstance(root_path, bytes):
2248 root_path = os.fsencode(root_path)
2250 for tree_path, entry in index.iteritems():
2251 full_path = _tree_to_fs_path(root_path, tree_path)
2252 if isinstance(entry, ConflictedIndexEntry):
2253 # Conflicted files are always unstaged
2254 yield tree_path
2255 continue
2257 try:
2258 st = os.lstat(full_path)
2259 if stat.S_ISDIR(st.st_mode):
2260 if _has_directory_changed(tree_path, entry):
2261 yield tree_path
2262 continue
2264 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
2265 continue
2267 blob = blob_from_path_and_stat(full_path, st)
2269 if filter_blob_callback is not None:
2270 blob = filter_blob_callback(blob, tree_path)
2271 except FileNotFoundError:
2272 # The file was removed, so we assume that counts as
2273 # different from whatever file used to exist.
2274 yield tree_path
2275 else:
2276 if blob.id != entry.sha:
2277 yield tree_path
2280def _tree_to_fs_path(
2281 root_path: bytes, tree_path: bytes, tree_encoding: str = "utf-8"
2282) -> bytes:
2283 """Convert a git tree path to a file system path.
2285 Args:
2286 root_path: Root filesystem path
2287 tree_path: Git tree path as bytes (encoded with tree_encoding)
2288 tree_encoding: Encoding used for tree paths (default: utf-8)
2290 Returns: File system path.
2291 """
2292 assert isinstance(tree_path, bytes)
2293 if os_sep_bytes != b"/":
2294 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
2295 else:
2296 sep_corrected_path = tree_path
2298 # On Windows, we need to handle tree path encoding properly
2299 if sys.platform == "win32":
2300 # Decode from tree encoding, then re-encode for filesystem
2301 try:
2302 tree_path_str = sep_corrected_path.decode(tree_encoding)
2303 sep_corrected_path = os.fsencode(tree_path_str)
2304 except UnicodeDecodeError:
2305 # If decoding fails, use the original bytes
2306 pass
2308 return os.path.join(root_path, sep_corrected_path)
2311def _fs_to_tree_path(fs_path: Union[str, bytes], tree_encoding: str = "utf-8") -> bytes:
2312 """Convert a file system path to a git tree path.
2314 Args:
2315 fs_path: File system path.
2316 tree_encoding: Encoding to use for tree paths (default: utf-8)
2318 Returns: Git tree path as bytes (encoded with tree_encoding)
2319 """
2320 if not isinstance(fs_path, bytes):
2321 fs_path_bytes = os.fsencode(fs_path)
2322 else:
2323 fs_path_bytes = fs_path
2325 # On Windows, we need to ensure tree paths are properly encoded
2326 if sys.platform == "win32":
2327 try:
2328 # Decode from filesystem encoding, then re-encode with tree encoding
2329 fs_path_str = os.fsdecode(fs_path_bytes)
2330 fs_path_bytes = fs_path_str.encode(tree_encoding)
2331 except UnicodeDecodeError:
2332 # If filesystem decoding fails, use the original bytes
2333 pass
2335 if os_sep_bytes != b"/":
2336 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
2337 else:
2338 tree_path = fs_path_bytes
2339 return tree_path
2342def index_entry_from_directory(st: os.stat_result, path: bytes) -> Optional[IndexEntry]:
2343 if os.path.exists(os.path.join(path, b".git")):
2344 head = read_submodule_head(path)
2345 if head is None:
2346 return None
2347 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
2348 return None
2351def index_entry_from_path(
2352 path: bytes, object_store: Optional[ObjectContainer] = None
2353) -> Optional[IndexEntry]:
2354 """Create an index from a filesystem path.
2356 This returns an index value for files, symlinks
2357 and tree references. for directories and
2358 non-existent files it returns None
2360 Args:
2361 path: Path to create an index entry for
2362 object_store: Optional object store to
2363 save new blobs in
2364 Returns: An index entry; None for directories
2365 """
2366 assert isinstance(path, bytes)
2367 st = os.lstat(path)
2368 if stat.S_ISDIR(st.st_mode):
2369 return index_entry_from_directory(st, path)
2371 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
2372 blob = blob_from_path_and_stat(path, st)
2373 if object_store is not None:
2374 object_store.add_object(blob)
2375 return index_entry_from_stat(st, blob.id)
2377 return None
2380def iter_fresh_entries(
2381 paths: Iterable[bytes],
2382 root_path: bytes,
2383 object_store: Optional[ObjectContainer] = None,
2384) -> Iterator[tuple[bytes, Optional[IndexEntry]]]:
2385 """Iterate over current versions of index entries on disk.
2387 Args:
2388 paths: Paths to iterate over
2389 root_path: Root path to access from
2390 object_store: Optional store to save new blobs in
2391 Returns: Iterator over path, index_entry
2392 """
2393 for path in paths:
2394 p = _tree_to_fs_path(root_path, path)
2395 try:
2396 entry = index_entry_from_path(p, object_store=object_store)
2397 except (FileNotFoundError, IsADirectoryError):
2398 entry = None
2399 yield path, entry
2402def iter_fresh_objects(
2403 paths: Iterable[bytes],
2404 root_path: bytes,
2405 include_deleted: bool = False,
2406 object_store: Optional[ObjectContainer] = None,
2407) -> Iterator[tuple[bytes, Optional[bytes], Optional[int]]]:
2408 """Iterate over versions of objects on disk referenced by index.
2410 Args:
2411 root_path: Root path to access from
2412 include_deleted: Include deleted entries with sha and
2413 mode set to None
2414 object_store: Optional object store to report new items to
2415 Returns: Iterator over path, sha, mode
2416 """
2417 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
2418 if entry is None:
2419 if include_deleted:
2420 yield path, None, None
2421 else:
2422 yield path, entry.sha, cleanup_mode(entry.mode)
2425def refresh_index(index: Index, root_path: bytes) -> None:
2426 """Refresh the contents of an index.
2428 This is the equivalent to running 'git commit -a'.
2430 Args:
2431 index: Index to update
2432 root_path: Root filesystem path
2433 """
2434 for path, entry in iter_fresh_entries(index, root_path):
2435 if entry:
2436 index[path] = entry
2439class locked_index:
2440 """Lock the index while making modifications.
2442 Works as a context manager.
2443 """
2445 _file: "_GitFile"
2447 def __init__(self, path: Union[bytes, str]) -> None:
2448 self._path = path
2450 def __enter__(self) -> Index:
2451 self._file = GitFile(self._path, "wb")
2452 self._index = Index(self._path)
2453 return self._index
2455 def __exit__(
2456 self,
2457 exc_type: Optional[type],
2458 exc_value: Optional[BaseException],
2459 traceback: Optional[types.TracebackType],
2460 ) -> None:
2461 if exc_type is not None:
2462 self._file.abort()
2463 return
2464 try:
2465 from typing import BinaryIO, cast
2467 f = SHA1Writer(cast(BinaryIO, self._file))
2468 write_index_dict(cast(BinaryIO, f), self._index._byname)
2469 except BaseException:
2470 self._file.abort()
2471 else:
2472 f.close()