Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dulwich/index.py: 51%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# index.py -- File parser/writer for the git index file
2# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
5# General Public License as public by the Free Software Foundation; version 2.0
6# or (at your option) any later version. You can redistribute it and/or
7# modify it under the terms of either of these two licenses.
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15# You should have received a copy of the licenses; if not, see
16# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
17# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
18# License, Version 2.0.
19#
21"""Parser for the git index file format."""
23import os
24import stat
25import struct
26import sys
27from dataclasses import dataclass
28from enum import Enum
29from typing import (
30 Any,
31 BinaryIO,
32 Callable,
33 Dict,
34 Iterable,
35 Iterator,
36 List,
37 Optional,
38 Tuple,
39 Union,
40)
42from .file import GitFile
43from .object_store import iter_tree_contents
44from .objects import (
45 S_IFGITLINK,
46 S_ISGITLINK,
47 Blob,
48 ObjectID,
49 Tree,
50 hex_to_sha,
51 sha_to_hex,
52)
53from .pack import ObjectContainer, SHA1Reader, SHA1Writer
55# 2-bit stage (during merge)
56FLAG_STAGEMASK = 0x3000
57FLAG_STAGESHIFT = 12
58FLAG_NAMEMASK = 0x0FFF
60# assume-valid
61FLAG_VALID = 0x8000
63# extended flag (must be zero in version 2)
64FLAG_EXTENDED = 0x4000
66# used by sparse checkout
67EXTENDED_FLAG_SKIP_WORKTREE = 0x4000
69# used by "git add -N"
70EXTENDED_FLAG_INTEND_TO_ADD = 0x2000
72DEFAULT_VERSION = 2
75class Stage(Enum):
76 NORMAL = 0
77 MERGE_CONFLICT_ANCESTOR = 1
78 MERGE_CONFLICT_THIS = 2
79 MERGE_CONFLICT_OTHER = 3
82@dataclass
83class SerializedIndexEntry:
84 name: bytes
85 ctime: Union[int, float, Tuple[int, int]]
86 mtime: Union[int, float, Tuple[int, int]]
87 dev: int
88 ino: int
89 mode: int
90 uid: int
91 gid: int
92 size: int
93 sha: bytes
94 flags: int
95 extended_flags: int
97 def stage(self) -> Stage:
98 return Stage((self.flags & FLAG_STAGEMASK) >> FLAG_STAGESHIFT)
101@dataclass
102class IndexEntry:
103 ctime: Union[int, float, Tuple[int, int]]
104 mtime: Union[int, float, Tuple[int, int]]
105 dev: int
106 ino: int
107 mode: int
108 uid: int
109 gid: int
110 size: int
111 sha: bytes
113 @classmethod
114 def from_serialized(cls, serialized: SerializedIndexEntry) -> "IndexEntry":
115 return cls(
116 ctime=serialized.ctime,
117 mtime=serialized.mtime,
118 dev=serialized.dev,
119 ino=serialized.ino,
120 mode=serialized.mode,
121 uid=serialized.uid,
122 gid=serialized.gid,
123 size=serialized.size,
124 sha=serialized.sha,
125 )
127 def serialize(self, name: bytes, stage: Stage) -> SerializedIndexEntry:
128 return SerializedIndexEntry(
129 name=name,
130 ctime=self.ctime,
131 mtime=self.mtime,
132 dev=self.dev,
133 ino=self.ino,
134 mode=self.mode,
135 uid=self.uid,
136 gid=self.gid,
137 size=self.size,
138 sha=self.sha,
139 flags=stage.value << FLAG_STAGESHIFT,
140 extended_flags=0,
141 )
144class ConflictedIndexEntry:
145 """Index entry that represents a conflict."""
147 ancestor: Optional[IndexEntry]
148 this: Optional[IndexEntry]
149 other: Optional[IndexEntry]
151 def __init__(
152 self,
153 ancestor: Optional[IndexEntry] = None,
154 this: Optional[IndexEntry] = None,
155 other: Optional[IndexEntry] = None,
156 ) -> None:
157 self.ancestor = ancestor
158 self.this = this
159 self.other = other
162class UnmergedEntries(Exception):
163 """Unmerged entries exist in the index."""
166def pathsplit(path: bytes) -> Tuple[bytes, bytes]:
167 """Split a /-delimited path into a directory part and a basename.
169 Args:
170 path: The path to split.
172 Returns:
173 Tuple with directory name and basename
174 """
175 try:
176 (dirname, basename) = path.rsplit(b"/", 1)
177 except ValueError:
178 return (b"", path)
179 else:
180 return (dirname, basename)
183def pathjoin(*args):
184 """Join a /-delimited path."""
185 return b"/".join([p for p in args if p])
188def read_cache_time(f):
189 """Read a cache time.
191 Args:
192 f: File-like object to read from
193 Returns:
194 Tuple with seconds and nanoseconds
195 """
196 return struct.unpack(">LL", f.read(8))
199def write_cache_time(f, t):
200 """Write a cache time.
202 Args:
203 f: File-like object to write to
204 t: Time to write (as int, float or tuple with secs and nsecs)
205 """
206 if isinstance(t, int):
207 t = (t, 0)
208 elif isinstance(t, float):
209 (secs, nsecs) = divmod(t, 1.0)
210 t = (int(secs), int(nsecs * 1000000000))
211 elif not isinstance(t, tuple):
212 raise TypeError(t)
213 f.write(struct.pack(">LL", *t))
216def read_cache_entry(f, version: int) -> SerializedIndexEntry:
217 """Read an entry from a cache file.
219 Args:
220 f: File-like object to read from
221 """
222 beginoffset = f.tell()
223 ctime = read_cache_time(f)
224 mtime = read_cache_time(f)
225 (
226 dev,
227 ino,
228 mode,
229 uid,
230 gid,
231 size,
232 sha,
233 flags,
234 ) = struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
235 if flags & FLAG_EXTENDED:
236 if version < 3:
237 raise AssertionError("extended flag set in index with version < 3")
238 (extended_flags,) = struct.unpack(">H", f.read(2))
239 else:
240 extended_flags = 0
241 name = f.read(flags & FLAG_NAMEMASK)
242 # Padding:
243 if version < 4:
244 real_size = (f.tell() - beginoffset + 8) & ~7
245 f.read((beginoffset + real_size) - f.tell())
246 return SerializedIndexEntry(
247 name,
248 ctime,
249 mtime,
250 dev,
251 ino,
252 mode,
253 uid,
254 gid,
255 size,
256 sha_to_hex(sha),
257 flags & ~FLAG_NAMEMASK,
258 extended_flags,
259 )
262def write_cache_entry(f, entry: SerializedIndexEntry, version: int) -> None:
263 """Write an index entry to a file.
265 Args:
266 f: File object
267 entry: IndexEntry to write, tuple with:
268 """
269 beginoffset = f.tell()
270 write_cache_time(f, entry.ctime)
271 write_cache_time(f, entry.mtime)
272 flags = len(entry.name) | (entry.flags & ~FLAG_NAMEMASK)
273 if entry.extended_flags:
274 flags |= FLAG_EXTENDED
275 if flags & FLAG_EXTENDED and version is not None and version < 3:
276 raise AssertionError("unable to use extended flags in version < 3")
277 f.write(
278 struct.pack(
279 b">LLLLLL20sH",
280 entry.dev & 0xFFFFFFFF,
281 entry.ino & 0xFFFFFFFF,
282 entry.mode,
283 entry.uid,
284 entry.gid,
285 entry.size,
286 hex_to_sha(entry.sha),
287 flags,
288 )
289 )
290 if flags & FLAG_EXTENDED:
291 f.write(struct.pack(b">H", entry.extended_flags))
292 f.write(entry.name)
293 if version < 4:
294 real_size = (f.tell() - beginoffset + 8) & ~7
295 f.write(b"\0" * ((beginoffset + real_size) - f.tell()))
298class UnsupportedIndexFormat(Exception):
299 """An unsupported index format was encountered."""
301 def __init__(self, version) -> None:
302 self.index_format_version = version
305def read_index(f: BinaryIO) -> Iterator[SerializedIndexEntry]:
306 """Read an index file, yielding the individual entries."""
307 header = f.read(4)
308 if header != b"DIRC":
309 raise AssertionError(f"Invalid index file header: {header!r}")
310 (version, num_entries) = struct.unpack(b">LL", f.read(4 * 2))
311 if version not in (1, 2, 3):
312 raise UnsupportedIndexFormat(version)
313 for i in range(num_entries):
314 yield read_cache_entry(f, version)
317def read_index_dict(f) -> Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]:
318 """Read an index file and return it as a dictionary.
319 Dict Key is tuple of path and stage number, as
320 path alone is not unique
321 Args:
322 f: File object to read fromls.
323 """
324 ret: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]] = {}
325 for entry in read_index(f):
326 stage = entry.stage()
327 if stage == Stage.NORMAL:
328 ret[entry.name] = IndexEntry.from_serialized(entry)
329 else:
330 existing = ret.setdefault(entry.name, ConflictedIndexEntry())
331 if isinstance(existing, IndexEntry):
332 raise AssertionError(f"Non-conflicted entry for {entry.name!r} exists")
333 if stage == Stage.MERGE_CONFLICT_ANCESTOR:
334 existing.ancestor = IndexEntry.from_serialized(entry)
335 elif stage == Stage.MERGE_CONFLICT_THIS:
336 existing.this = IndexEntry.from_serialized(entry)
337 elif stage == Stage.MERGE_CONFLICT_OTHER:
338 existing.other = IndexEntry.from_serialized(entry)
339 return ret
342def write_index(
343 f: BinaryIO, entries: List[SerializedIndexEntry], version: Optional[int] = None
344):
345 """Write an index file.
347 Args:
348 f: File-like object to write to
349 version: Version number to write
350 entries: Iterable over the entries to write
351 """
352 if version is None:
353 version = DEFAULT_VERSION
354 f.write(b"DIRC")
355 f.write(struct.pack(b">LL", version, len(entries)))
356 for entry in entries:
357 write_cache_entry(f, entry, version)
360def write_index_dict(
361 f: BinaryIO,
362 entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]],
363 version: Optional[int] = None,
364) -> None:
365 """Write an index file based on the contents of a dictionary.
366 being careful to sort by path and then by stage.
367 """
368 entries_list = []
369 for key in sorted(entries):
370 value = entries[key]
371 if isinstance(value, ConflictedIndexEntry):
372 if value.ancestor is not None:
373 entries_list.append(
374 value.ancestor.serialize(key, Stage.MERGE_CONFLICT_ANCESTOR)
375 )
376 if value.this is not None:
377 entries_list.append(
378 value.this.serialize(key, Stage.MERGE_CONFLICT_THIS)
379 )
380 if value.other is not None:
381 entries_list.append(
382 value.other.serialize(key, Stage.MERGE_CONFLICT_OTHER)
383 )
384 else:
385 entries_list.append(value.serialize(key, Stage.NORMAL))
386 write_index(f, entries_list, version=version)
389def cleanup_mode(mode: int) -> int:
390 """Cleanup a mode value.
392 This will return a mode that can be stored in a tree object.
394 Args:
395 mode: Mode to clean up.
397 Returns:
398 mode
399 """
400 if stat.S_ISLNK(mode):
401 return stat.S_IFLNK
402 elif stat.S_ISDIR(mode):
403 return stat.S_IFDIR
404 elif S_ISGITLINK(mode):
405 return S_IFGITLINK
406 ret = stat.S_IFREG | 0o644
407 if mode & 0o100:
408 ret |= 0o111
409 return ret
412class Index:
413 """A Git Index file."""
415 _byname: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]
417 def __init__(self, filename: Union[bytes, str], read=True) -> None:
418 """Create an index object associated with the given filename.
420 Args:
421 filename: Path to the index file
422 read: Whether to initialize the index from the given file, should it exist.
423 """
424 self._filename = filename
425 # TODO(jelmer): Store the version returned by read_index
426 self._version = None
427 self.clear()
428 if read:
429 self.read()
431 @property
432 def path(self):
433 return self._filename
435 def __repr__(self) -> str:
436 return f"{self.__class__.__name__}({self._filename!r})"
438 def write(self) -> None:
439 """Write current contents of index to disk."""
440 f = GitFile(self._filename, "wb")
441 try:
442 f = SHA1Writer(f)
443 write_index_dict(f, self._byname, version=self._version)
444 finally:
445 f.close()
447 def read(self):
448 """Read current contents of index from disk."""
449 if not os.path.exists(self._filename):
450 return
451 f = GitFile(self._filename, "rb")
452 try:
453 f = SHA1Reader(f)
454 self.update(read_index_dict(f))
455 # FIXME: Additional data?
456 f.read(os.path.getsize(self._filename) - f.tell() - 20)
457 f.check_sha()
458 finally:
459 f.close()
461 def __len__(self) -> int:
462 """Number of entries in this index file."""
463 return len(self._byname)
465 def __getitem__(self, key: bytes) -> Union[IndexEntry, ConflictedIndexEntry]:
466 """Retrieve entry by relative path and stage.
468 Returns: Either a IndexEntry or a ConflictedIndexEntry
469 Raises KeyError: if the entry does not exist
470 """
471 return self._byname[key]
473 def __iter__(self) -> Iterator[bytes]:
474 """Iterate over the paths and stages in this index."""
475 return iter(self._byname)
477 def __contains__(self, key):
478 return key in self._byname
480 def get_sha1(self, path: bytes) -> bytes:
481 """Return the (git object) SHA1 for the object at a path."""
482 value = self[path]
483 if isinstance(value, ConflictedIndexEntry):
484 raise UnmergedEntries
485 return value.sha
487 def get_mode(self, path: bytes) -> int:
488 """Return the POSIX file mode for the object at a path."""
489 value = self[path]
490 if isinstance(value, ConflictedIndexEntry):
491 raise UnmergedEntries
492 return value.mode
494 def iterobjects(self) -> Iterable[Tuple[bytes, bytes, int]]:
495 """Iterate over path, sha, mode tuples for use with commit_tree."""
496 for path in self:
497 entry = self[path]
498 if isinstance(entry, ConflictedIndexEntry):
499 raise UnmergedEntries
500 yield path, entry.sha, cleanup_mode(entry.mode)
502 def has_conflicts(self) -> bool:
503 for value in self._byname.values():
504 if isinstance(value, ConflictedIndexEntry):
505 return True
506 return False
508 def clear(self):
509 """Remove all contents from this index."""
510 self._byname = {}
512 def __setitem__(
513 self, name: bytes, value: Union[IndexEntry, ConflictedIndexEntry]
514 ) -> None:
515 assert isinstance(name, bytes)
516 self._byname[name] = value
518 def __delitem__(self, name: bytes) -> None:
519 del self._byname[name]
521 def iteritems(
522 self,
523 ) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
524 return iter(self._byname.items())
526 def items(self) -> Iterator[Tuple[bytes, Union[IndexEntry, ConflictedIndexEntry]]]:
527 return iter(self._byname.items())
529 def update(self, entries: Dict[bytes, Union[IndexEntry, ConflictedIndexEntry]]):
530 for key, value in entries.items():
531 self[key] = value
533 def paths(self):
534 yield from self._byname.keys()
536 def changes_from_tree(
537 self, object_store, tree: ObjectID, want_unchanged: bool = False
538 ):
539 """Find the differences between the contents of this index and a tree.
541 Args:
542 object_store: Object store to use for retrieving tree contents
543 tree: SHA1 of the root tree
544 want_unchanged: Whether unchanged files should be reported
545 Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
546 newmode), (oldsha, newsha)
547 """
549 def lookup_entry(path):
550 entry = self[path]
551 return entry.sha, cleanup_mode(entry.mode)
553 yield from changes_from_tree(
554 self.paths(),
555 lookup_entry,
556 object_store,
557 tree,
558 want_unchanged=want_unchanged,
559 )
561 def commit(self, object_store):
562 """Create a new tree from an index.
564 Args:
565 object_store: Object store to save the tree in
566 Returns:
567 Root tree SHA
568 """
569 return commit_tree(object_store, self.iterobjects())
572def commit_tree(
573 object_store: ObjectContainer, blobs: Iterable[Tuple[bytes, bytes, int]]
574) -> bytes:
575 """Commit a new tree.
577 Args:
578 object_store: Object store to add trees to
579 blobs: Iterable over blob path, sha, mode entries
580 Returns:
581 SHA1 of the created tree.
582 """
583 trees: Dict[bytes, Any] = {b"": {}}
585 def add_tree(path):
586 if path in trees:
587 return trees[path]
588 dirname, basename = pathsplit(path)
589 t = add_tree(dirname)
590 assert isinstance(basename, bytes)
591 newtree = {}
592 t[basename] = newtree
593 trees[path] = newtree
594 return newtree
596 for path, sha, mode in blobs:
597 tree_path, basename = pathsplit(path)
598 tree = add_tree(tree_path)
599 tree[basename] = (mode, sha)
601 def build_tree(path):
602 tree = Tree()
603 for basename, entry in trees[path].items():
604 if isinstance(entry, dict):
605 mode = stat.S_IFDIR
606 sha = build_tree(pathjoin(path, basename))
607 else:
608 (mode, sha) = entry
609 tree.add(basename, mode, sha)
610 object_store.add_object(tree)
611 return tree.id
613 return build_tree(b"")
616def commit_index(object_store: ObjectContainer, index: Index) -> bytes:
617 """Create a new tree from an index.
619 Args:
620 object_store: Object store to save the tree in
621 index: Index file
622 Note: This function is deprecated, use index.commit() instead.
623 Returns: Root tree sha.
624 """
625 return commit_tree(object_store, index.iterobjects())
628def changes_from_tree(
629 names: Iterable[bytes],
630 lookup_entry: Callable[[bytes], Tuple[bytes, int]],
631 object_store: ObjectContainer,
632 tree: Optional[bytes],
633 want_unchanged=False,
634) -> Iterable[
635 Tuple[
636 Tuple[Optional[bytes], Optional[bytes]],
637 Tuple[Optional[int], Optional[int]],
638 Tuple[Optional[bytes], Optional[bytes]],
639 ]
640]:
641 """Find the differences between the contents of a tree and
642 a working copy.
644 Args:
645 names: Iterable of names in the working copy
646 lookup_entry: Function to lookup an entry in the working copy
647 object_store: Object store to use for retrieving tree contents
648 tree: SHA1 of the root tree, or None for an empty tree
649 want_unchanged: Whether unchanged files should be reported
650 Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
651 (oldsha, newsha)
652 """
653 # TODO(jelmer): Support a include_trees option
654 other_names = set(names)
656 if tree is not None:
657 for name, mode, sha in iter_tree_contents(object_store, tree):
658 try:
659 (other_sha, other_mode) = lookup_entry(name)
660 except KeyError:
661 # Was removed
662 yield ((name, None), (mode, None), (sha, None))
663 else:
664 other_names.remove(name)
665 if want_unchanged or other_sha != sha or other_mode != mode:
666 yield ((name, name), (mode, other_mode), (sha, other_sha))
668 # Mention added files
669 for name in other_names:
670 try:
671 (other_sha, other_mode) = lookup_entry(name)
672 except KeyError:
673 pass
674 else:
675 yield ((None, name), (None, other_mode), (None, other_sha))
678def index_entry_from_stat(
679 stat_val,
680 hex_sha: bytes,
681 mode: Optional[int] = None,
682):
683 """Create a new index entry from a stat value.
685 Args:
686 stat_val: POSIX stat_result instance
687 hex_sha: Hex sha of the object
688 """
689 if mode is None:
690 mode = cleanup_mode(stat_val.st_mode)
692 return IndexEntry(
693 stat_val.st_ctime,
694 stat_val.st_mtime,
695 stat_val.st_dev,
696 stat_val.st_ino,
697 mode,
698 stat_val.st_uid,
699 stat_val.st_gid,
700 stat_val.st_size,
701 hex_sha,
702 )
705if sys.platform == "win32":
706 # On Windows, creating symlinks either requires administrator privileges
707 # or developer mode. Raise a more helpful error when we're unable to
708 # create symlinks
710 # https://github.com/jelmer/dulwich/issues/1005
712 class WindowsSymlinkPermissionError(PermissionError):
713 def __init__(self, errno, msg, filename) -> None:
714 super(PermissionError, self).__init__(
715 errno,
716 "Unable to create symlink; "
717 f"do you have developer mode enabled? {msg}",
718 filename,
719 )
721 def symlink(src, dst, target_is_directory=False, *, dir_fd=None):
722 try:
723 return os.symlink(
724 src, dst, target_is_directory=target_is_directory, dir_fd=dir_fd
725 )
726 except PermissionError as e:
727 raise WindowsSymlinkPermissionError(e.errno, e.strerror, e.filename) from e
728else:
729 symlink = os.symlink
732def build_file_from_blob(
733 blob: Blob,
734 mode: int,
735 target_path: bytes,
736 *,
737 honor_filemode=True,
738 tree_encoding="utf-8",
739 symlink_fn=None,
740):
741 """Build a file or symlink on disk based on a Git object.
743 Args:
744 blob: The git object
745 mode: File mode
746 target_path: Path to write to
747 honor_filemode: An optional flag to honor core.filemode setting in
748 config file, default is core.filemode=True, change executable bit
749 symlink: Function to use for creating symlinks
750 Returns: stat object for the file
751 """
752 try:
753 oldstat = os.lstat(target_path)
754 except FileNotFoundError:
755 oldstat = None
756 contents = blob.as_raw_string()
757 if stat.S_ISLNK(mode):
758 if oldstat:
759 os.unlink(target_path)
760 if sys.platform == "win32":
761 # os.readlink on Python3 on Windows requires a unicode string.
762 contents = contents.decode(tree_encoding) # type: ignore
763 target_path = target_path.decode(tree_encoding) # type: ignore
764 (symlink_fn or symlink)(contents, target_path)
765 else:
766 if oldstat is not None and oldstat.st_size == len(contents):
767 with open(target_path, "rb") as f:
768 if f.read() == contents:
769 return oldstat
771 with open(target_path, "wb") as f:
772 # Write out file
773 f.write(contents)
775 if honor_filemode:
776 os.chmod(target_path, mode)
778 return os.lstat(target_path)
781INVALID_DOTNAMES = (b".git", b".", b"..", b"")
784def validate_path_element_default(element: bytes) -> bool:
785 return element.lower() not in INVALID_DOTNAMES
788def validate_path_element_ntfs(element: bytes) -> bool:
789 stripped = element.rstrip(b". ").lower()
790 if stripped in INVALID_DOTNAMES:
791 return False
792 if stripped == b"git~1":
793 return False
794 return True
797def validate_path(path: bytes, element_validator=validate_path_element_default) -> bool:
798 """Default path validator that just checks for .git/."""
799 parts = path.split(b"/")
800 for p in parts:
801 if not element_validator(p):
802 return False
803 else:
804 return True
807def build_index_from_tree(
808 root_path: Union[str, bytes],
809 index_path: Union[str, bytes],
810 object_store: ObjectContainer,
811 tree_id: bytes,
812 honor_filemode: bool = True,
813 validate_path_element=validate_path_element_default,
814 symlink_fn=None,
815):
816 """Generate and materialize index from a tree.
818 Args:
819 tree_id: Tree to materialize
820 root_path: Target dir for materialized index files
821 index_path: Target path for generated index
822 object_store: Non-empty object store holding tree contents
823 honor_filemode: An optional flag to honor core.filemode setting in
824 config file, default is core.filemode=True, change executable bit
825 validate_path_element: Function to validate path elements to check
826 out; default just refuses .git and .. directories.
828 Note: existing index is wiped and contents are not merged
829 in a working dir. Suitable only for fresh clones.
830 """
831 index = Index(index_path, read=False)
832 if not isinstance(root_path, bytes):
833 root_path = os.fsencode(root_path)
835 for entry in iter_tree_contents(object_store, tree_id):
836 if not validate_path(entry.path, validate_path_element):
837 continue
838 full_path = _tree_to_fs_path(root_path, entry.path)
840 if not os.path.exists(os.path.dirname(full_path)):
841 os.makedirs(os.path.dirname(full_path))
843 # TODO(jelmer): Merge new index into working tree
844 if S_ISGITLINK(entry.mode):
845 if not os.path.isdir(full_path):
846 os.mkdir(full_path)
847 st = os.lstat(full_path)
848 # TODO(jelmer): record and return submodule paths
849 else:
850 obj = object_store[entry.sha]
851 assert isinstance(obj, Blob)
852 st = build_file_from_blob(
853 obj,
854 entry.mode,
855 full_path,
856 honor_filemode=honor_filemode,
857 symlink_fn=symlink_fn,
858 )
860 # Add file to index
861 if not honor_filemode or S_ISGITLINK(entry.mode):
862 # we can not use tuple slicing to build a new tuple,
863 # because on windows that will convert the times to
864 # longs, which causes errors further along
865 st_tuple = (
866 entry.mode,
867 st.st_ino,
868 st.st_dev,
869 st.st_nlink,
870 st.st_uid,
871 st.st_gid,
872 st.st_size,
873 st.st_atime,
874 st.st_mtime,
875 st.st_ctime,
876 )
877 st = st.__class__(st_tuple)
878 # default to a stage 0 index entry (normal)
879 # when reading from the filesystem
880 index[entry.path] = index_entry_from_stat(st, entry.sha)
882 index.write()
885def blob_from_path_and_mode(fs_path: bytes, mode: int, tree_encoding="utf-8"):
886 """Create a blob from a path and a stat object.
888 Args:
889 fs_path: Full file system path to file
890 mode: File mode
891 Returns: A `Blob` object
892 """
893 assert isinstance(fs_path, bytes)
894 blob = Blob()
895 if stat.S_ISLNK(mode):
896 if sys.platform == "win32":
897 # os.readlink on Python3 on Windows requires a unicode string.
898 blob.data = os.readlink(os.fsdecode(fs_path)).encode(tree_encoding)
899 else:
900 blob.data = os.readlink(fs_path)
901 else:
902 with open(fs_path, "rb") as f:
903 blob.data = f.read()
904 return blob
907def blob_from_path_and_stat(fs_path: bytes, st, tree_encoding="utf-8"):
908 """Create a blob from a path and a stat object.
910 Args:
911 fs_path: Full file system path to file
912 st: A stat object
913 Returns: A `Blob` object
914 """
915 return blob_from_path_and_mode(fs_path, st.st_mode, tree_encoding)
918def read_submodule_head(path: Union[str, bytes]) -> Optional[bytes]:
919 """Read the head commit of a submodule.
921 Args:
922 path: path to the submodule
923 Returns: HEAD sha, None if not a valid head/repository
924 """
925 from .errors import NotGitRepository
926 from .repo import Repo
928 # Repo currently expects a "str", so decode if necessary.
929 # TODO(jelmer): Perhaps move this into Repo() ?
930 if not isinstance(path, str):
931 path = os.fsdecode(path)
932 try:
933 repo = Repo(path)
934 except NotGitRepository:
935 return None
936 try:
937 return repo.head()
938 except KeyError:
939 return None
942def _has_directory_changed(tree_path: bytes, entry):
943 """Check if a directory has changed after getting an error.
945 When handling an error trying to create a blob from a path, call this
946 function. It will check if the path is a directory. If it's a directory
947 and a submodule, check the submodule head to see if it's has changed. If
948 not, consider the file as changed as Git tracked a file and not a
949 directory.
951 Return true if the given path should be considered as changed and False
952 otherwise or if the path is not a directory.
953 """
954 # This is actually a directory
955 if os.path.exists(os.path.join(tree_path, b".git")):
956 # Submodule
957 head = read_submodule_head(tree_path)
958 if entry.sha != head:
959 return True
960 else:
961 # The file was changed to a directory, so consider it removed.
962 return True
964 return False
967def get_unstaged_changes(
968 index: Index, root_path: Union[str, bytes], filter_blob_callback=None
969):
970 """Walk through an index and check for differences against working tree.
972 Args:
973 index: index to check
974 root_path: path in which to find files
975 Returns: iterator over paths with unstaged changes
976 """
977 # For each entry in the index check the sha1 & ensure not staged
978 if not isinstance(root_path, bytes):
979 root_path = os.fsencode(root_path)
981 for tree_path, entry in index.iteritems():
982 full_path = _tree_to_fs_path(root_path, tree_path)
983 if isinstance(entry, ConflictedIndexEntry):
984 # Conflicted files are always unstaged
985 yield tree_path
986 continue
988 try:
989 st = os.lstat(full_path)
990 if stat.S_ISDIR(st.st_mode):
991 if _has_directory_changed(tree_path, entry):
992 yield tree_path
993 continue
995 if not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
996 continue
998 blob = blob_from_path_and_stat(full_path, st)
1000 if filter_blob_callback is not None:
1001 blob = filter_blob_callback(blob, tree_path)
1002 except FileNotFoundError:
1003 # The file was removed, so we assume that counts as
1004 # different from whatever file used to exist.
1005 yield tree_path
1006 else:
1007 if blob.id != entry.sha:
1008 yield tree_path
1011os_sep_bytes = os.sep.encode("ascii")
1014def _tree_to_fs_path(root_path: bytes, tree_path: bytes):
1015 """Convert a git tree path to a file system path.
1017 Args:
1018 root_path: Root filesystem path
1019 tree_path: Git tree path as bytes
1021 Returns: File system path.
1022 """
1023 assert isinstance(tree_path, bytes)
1024 if os_sep_bytes != b"/":
1025 sep_corrected_path = tree_path.replace(b"/", os_sep_bytes)
1026 else:
1027 sep_corrected_path = tree_path
1028 return os.path.join(root_path, sep_corrected_path)
1031def _fs_to_tree_path(fs_path: Union[str, bytes]) -> bytes:
1032 """Convert a file system path to a git tree path.
1034 Args:
1035 fs_path: File system path.
1037 Returns: Git tree path as bytes
1038 """
1039 if not isinstance(fs_path, bytes):
1040 fs_path_bytes = os.fsencode(fs_path)
1041 else:
1042 fs_path_bytes = fs_path
1043 if os_sep_bytes != b"/":
1044 tree_path = fs_path_bytes.replace(os_sep_bytes, b"/")
1045 else:
1046 tree_path = fs_path_bytes
1047 return tree_path
1050def index_entry_from_directory(st, path: bytes) -> Optional[IndexEntry]:
1051 if os.path.exists(os.path.join(path, b".git")):
1052 head = read_submodule_head(path)
1053 if head is None:
1054 return None
1055 return index_entry_from_stat(st, head, mode=S_IFGITLINK)
1056 return None
1059def index_entry_from_path(
1060 path: bytes, object_store: Optional[ObjectContainer] = None
1061) -> Optional[IndexEntry]:
1062 """Create an index from a filesystem path.
1064 This returns an index value for files, symlinks
1065 and tree references. for directories and
1066 non-existent files it returns None
1068 Args:
1069 path: Path to create an index entry for
1070 object_store: Optional object store to
1071 save new blobs in
1072 Returns: An index entry; None for directories
1073 """
1074 assert isinstance(path, bytes)
1075 st = os.lstat(path)
1076 if stat.S_ISDIR(st.st_mode):
1077 return index_entry_from_directory(st, path)
1079 if stat.S_ISREG(st.st_mode) or stat.S_ISLNK(st.st_mode):
1080 blob = blob_from_path_and_stat(path, st)
1081 if object_store is not None:
1082 object_store.add_object(blob)
1083 return index_entry_from_stat(st, blob.id)
1085 return None
1088def iter_fresh_entries(
1089 paths: Iterable[bytes],
1090 root_path: bytes,
1091 object_store: Optional[ObjectContainer] = None,
1092) -> Iterator[Tuple[bytes, Optional[IndexEntry]]]:
1093 """Iterate over current versions of index entries on disk.
1095 Args:
1096 paths: Paths to iterate over
1097 root_path: Root path to access from
1098 object_store: Optional store to save new blobs in
1099 Returns: Iterator over path, index_entry
1100 """
1101 for path in paths:
1102 p = _tree_to_fs_path(root_path, path)
1103 try:
1104 entry = index_entry_from_path(p, object_store=object_store)
1105 except (FileNotFoundError, IsADirectoryError):
1106 entry = None
1107 yield path, entry
1110def iter_fresh_objects(
1111 paths: Iterable[bytes], root_path: bytes, include_deleted=False, object_store=None
1112) -> Iterator[Tuple[bytes, Optional[bytes], Optional[int]]]:
1113 """Iterate over versions of objects on disk referenced by index.
1115 Args:
1116 root_path: Root path to access from
1117 include_deleted: Include deleted entries with sha and
1118 mode set to None
1119 object_store: Optional object store to report new items to
1120 Returns: Iterator over path, sha, mode
1121 """
1122 for path, entry in iter_fresh_entries(paths, root_path, object_store=object_store):
1123 if entry is None:
1124 if include_deleted:
1125 yield path, None, None
1126 else:
1127 yield path, entry.sha, cleanup_mode(entry.mode)
1130def refresh_index(index: Index, root_path: bytes):
1131 """Refresh the contents of an index.
1133 This is the equivalent to running 'git commit -a'.
1135 Args:
1136 index: Index to update
1137 root_path: Root filesystem path
1138 """
1139 for path, entry in iter_fresh_entries(index, root_path):
1140 if entry:
1141 index[path] = entry
1144class locked_index:
1145 """Lock the index while making modifications.
1147 Works as a context manager.
1148 """
1150 def __init__(self, path: Union[bytes, str]) -> None:
1151 self._path = path
1153 def __enter__(self):
1154 self._file = GitFile(self._path, "wb")
1155 self._index = Index(self._path)
1156 return self._index
1158 def __exit__(self, exc_type, exc_value, traceback):
1159 if exc_type is not None:
1160 self._file.abort()
1161 return
1162 try:
1163 f = SHA1Writer(self._file)
1164 write_index_dict(f, self._index._byname)
1165 except BaseException:
1166 self._file.abort()
1167 else:
1168 f.close()