Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/index/fun.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This module is part of GitPython and is released under the
2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
4"""Standalone functions to accompany the index implementation and make it more
5versatile."""
7__all__ = [
8 "write_cache",
9 "read_cache",
10 "write_tree_from_cache",
11 "entry_key",
12 "stat_mode_to_index_mode",
13 "S_IFGITLINK",
14 "run_commit_hook",
15 "hook_path",
16]
18from io import BytesIO
19import os
20import os.path as osp
21from pathlib import Path
22from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_ISDIR, S_ISLNK, S_IXUSR
23import subprocess
24import sys
26from gitdb.base import IStream
27from gitdb.typ import str_tree_type
29from git.cmd import handle_process_output, safer_popen
30from git.compat import defenc, force_bytes, force_text, safe_decode
31from git.exc import HookExecutionError, UnmergedEntriesError
32from git.objects.fun import (
33 traverse_tree_recursive,
34 traverse_trees_recursive,
35 tree_to_stream,
36)
37from git.util import IndexFileSHA1Writer, finalize_process
39from .typ import CE_EXTENDED, BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT
40from .util import pack, unpack
42# typing -----------------------------------------------------------------------------
44from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast
46from git.types import PathLike
48if TYPE_CHECKING:
49 from git.db import GitCmdObjectDB
50 from git.objects.tree import TreeCacheTup
52 from .base import IndexFile
54# ------------------------------------------------------------------------------------
56S_IFGITLINK = S_IFLNK | S_IFDIR
57"""Flags for a submodule."""
59CE_NAMEMASK_INV = ~CE_NAMEMASK
62def hook_path(name: str, git_dir: PathLike) -> str:
63 """:return: path to the given named hook in the given git repository directory"""
64 return osp.join(git_dir, "hooks", name)
67def _has_file_extension(path: str) -> str:
68 return osp.splitext(path)[1]
71def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
72 """Run the commit hook of the given name. Silently ignore hooks that do not exist.
74 :param name:
75 Name of hook, like ``pre-commit``.
77 :param index:
78 :class:`~git.index.base.IndexFile` instance.
80 :param args:
81 Arguments passed to hook file.
83 :raise git.exc.HookExecutionError:
84 """
85 hp = hook_path(name, index.repo.git_dir)
86 if not os.access(hp, os.X_OK):
87 return
89 env = os.environ.copy()
90 env["GIT_INDEX_FILE"] = safe_decode(str(index.path))
91 env["GIT_EDITOR"] = ":"
92 cmd = [hp]
93 try:
94 if sys.platform == "win32" and not _has_file_extension(hp):
95 # Windows only uses extensions to determine how to open files
96 # (doesn't understand shebangs). Try using bash to run the hook.
97 relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
98 cmd = ["bash.exe", relative_hp]
100 process = safer_popen(
101 cmd + list(args),
102 env=env,
103 stdout=subprocess.PIPE,
104 stderr=subprocess.PIPE,
105 cwd=index.repo.working_dir,
106 )
107 except Exception as ex:
108 raise HookExecutionError(hp, ex) from ex
109 else:
110 stdout_list: List[str] = []
111 stderr_list: List[str] = []
112 handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process)
113 stdout = "".join(stdout_list)
114 stderr = "".join(stderr_list)
115 if process.returncode != 0:
116 stdout = force_text(stdout, defenc)
117 stderr = force_text(stderr, defenc)
118 raise HookExecutionError(hp, process.returncode, stderr, stdout)
119 # END handle return code
122def stat_mode_to_index_mode(mode: int) -> int:
123 """Convert the given mode from a stat call to the corresponding index mode and
124 return it."""
125 if S_ISLNK(mode): # symlinks
126 return S_IFLNK
127 if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
128 return S_IFGITLINK
129 return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit
132def write_cache(
133 entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]],
134 stream: IO[bytes],
135 extension_data: Union[None, bytes] = None,
136 ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer,
137) -> None:
138 """Write the cache represented by entries to a stream.
140 :param entries:
141 **Sorted** list of entries.
143 :param stream:
144 Stream to wrap into the AdapterStreamCls - it is used for final output.
146 :param ShaStreamCls:
147 Type to use when writing to the stream. It produces a sha while writing to it,
148 before the data is passed on to the wrapped stream.
150 :param extension_data:
151 Any kind of data to write as a trailer, it must begin a 4 byte identifier,
152 followed by its size (4 bytes).
153 """
154 # Wrap the stream into a compatible writer.
155 stream_sha = ShaStreamCls(stream)
157 tell = stream_sha.tell
158 write = stream_sha.write
160 # Header
161 version = 3 if any(entry.extended_flags for entry in entries) else 2
162 write(b"DIRC")
163 write(pack(">LL", version, len(entries)))
165 # Body
166 for entry in entries:
167 beginoffset = tell()
168 write(entry.ctime_bytes) # ctime
169 write(entry.mtime_bytes) # mtime
170 path_str = str(entry.path)
171 path: bytes = force_bytes(path_str, encoding=defenc)
172 plen = len(path) & CE_NAMEMASK # Path length
173 assert plen == len(path), "Path %s too long to fit into index" % entry.path
174 flags = plen | (entry.flags & CE_NAMEMASK_INV) # Clear possible previous values.
175 if entry.extended_flags:
176 flags |= CE_EXTENDED
177 write(
178 pack(
179 ">LLLLLL20sH",
180 entry.dev,
181 entry.inode,
182 entry.mode,
183 entry.uid,
184 entry.gid,
185 entry.size,
186 entry.binsha,
187 flags,
188 )
189 )
190 if entry.extended_flags:
191 write(pack(">H", entry.extended_flags))
192 write(path)
193 real_size = (tell() - beginoffset + 8) & ~7
194 write(b"\0" * ((beginoffset + real_size) - tell()))
195 # END for each entry
197 # Write previously cached extensions data.
198 if extension_data is not None:
199 stream_sha.write(extension_data)
201 # Write the sha over the content.
202 stream_sha.write_sha()
205def read_header(stream: IO[bytes]) -> Tuple[int, int]:
206 """Return tuple(version_long, num_entries) from the given stream."""
207 type_id = stream.read(4)
208 if type_id != b"DIRC":
209 raise AssertionError("Invalid index file header: %r" % type_id)
210 unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2)))
211 version, num_entries = unpacked
213 assert version in (1, 2, 3), "Unsupported git index version %i, only 1, 2, and 3 are supported" % version
214 return version, num_entries
217def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]:
218 """
219 :return:
220 Key suitable to be used for the
221 :attr:`index.entries <git.index.base.IndexFile.entries>` dictionary.
223 :param entry:
224 One instance of type BaseIndexEntry or the path and the stage.
225 """
227 # def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]:
228 # return isinstance(entry_key, tuple) and len(entry_key) == 2
230 if len(entry) == 1:
231 entry_first = entry[0]
232 assert isinstance(entry_first, BaseIndexEntry)
233 return (entry_first.path, entry_first.stage)
234 else:
235 # assert is_entry_key_tup(entry)
236 entry = cast(Tuple[PathLike, int], entry)
237 return entry
238 # END handle entry
241def read_cache(
242 stream: IO[bytes],
243) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]:
244 """Read a cache file from the given stream.
246 :return:
247 tuple(version, entries_dict, extension_data, content_sha)
249 * *version* is the integer version number.
250 * *entries_dict* is a dictionary which maps IndexEntry instances to a path at a
251 stage.
252 * *extension_data* is ``""`` or 4 bytes of type + 4 bytes of size + size bytes.
253 * *content_sha* is a 20 byte sha on all cache file contents.
254 """
255 version, num_entries = read_header(stream)
256 count = 0
257 entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {}
259 read = stream.read
260 tell = stream.tell
261 while count < num_entries:
262 beginoffset = tell()
263 ctime = unpack(">8s", read(8))[0]
264 mtime = unpack(">8s", read(8))[0]
265 (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2))
266 extended_flags = 0
267 if flags & CE_EXTENDED:
268 extended_flags = unpack(">H", read(2))[0]
269 path_size = flags & CE_NAMEMASK
270 path = read(path_size).decode(defenc)
272 real_size = (tell() - beginoffset + 8) & ~7
273 read((beginoffset + real_size) - tell())
274 entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size, extended_flags))
275 # entry_key would be the method to use, but we save the effort.
276 entries[(path, entry.stage)] = entry
277 count += 1
278 # END for each entry
280 # The footer contains extension data and a sha on the content so far.
281 # Keep the extension footer,and verify we have a sha in the end.
282 # Extension data format is:
283 # 4 bytes ID
284 # 4 bytes length of chunk
285 # Repeated 0 - N times
286 extension_data = stream.read(~0)
287 assert len(extension_data) > 19, (
288 "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data)
289 )
291 content_sha = extension_data[-20:]
293 # Truncate the sha in the end as we will dynamically create it anyway.
294 extension_data = extension_data[:-20]
296 return (version, entries, extension_data, content_sha)
299def write_tree_from_cache(
300 entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0
301) -> Tuple[bytes, List["TreeCacheTup"]]:
302 R"""Create a tree from the given sorted list of entries and put the respective
303 trees into the given object database.
305 :param entries:
306 **Sorted** list of :class:`~git.index.typ.IndexEntry`\s.
308 :param odb:
309 Object database to store the trees in.
311 :param si:
312 Start index at which we should start creating subtrees.
314 :param sl:
315 Slice indicating the range we should process on the entries list.
317 :return:
318 tuple(binsha, list(tree_entry, ...))
320 A tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name.
321 """
322 tree_items: List["TreeCacheTup"] = []
324 ci = sl.start
325 end = sl.stop
326 while ci < end:
327 entry = entries[ci]
328 if entry.stage != 0:
329 raise UnmergedEntriesError(entry)
330 # END abort on unmerged
331 ci += 1
332 rbound = entry.path.find("/", si)
333 if rbound == -1:
334 # It's not a tree.
335 tree_items.append((entry.binsha, entry.mode, entry.path[si:]))
336 else:
337 # Find common base range.
338 base = entry.path[si:rbound]
339 xi = ci
340 while xi < end:
341 oentry = entries[xi]
342 orbound = oentry.path.find("/", si)
343 if orbound == -1 or oentry.path[si:orbound] != base:
344 break
345 # END abort on base mismatch
346 xi += 1
347 # END find common base
349 # Enter recursion.
350 # ci - 1 as we want to count our current item as well.
351 sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
352 tree_items.append((sha, S_IFDIR, base))
354 # Skip ahead.
355 ci = xi
356 # END handle bounds
357 # END for each entry
359 # Finally create the tree.
360 sio = BytesIO()
361 tree_to_stream(tree_items, sio.write) # Writes to stream as bytes, but doesn't change tree_items.
362 sio.seek(0)
364 istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
365 return (istream.binsha, tree_items)
368def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry:
369 return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
372def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
373 R"""
374 :return:
375 List of :class:`~git.index.typ.BaseIndexEntry`\s representing the aggressive
376 merge of the given trees. All valid entries are on stage 0, whereas the
377 conflicting ones are left on stage 1, 2 or 3, whereas stage 1 corresponds to the
378 common ancestor tree, 2 to our tree and 3 to 'their' tree.
380 :param tree_shas:
381 1, 2 or 3 trees as identified by their binary 20 byte shas. If 1 or two, the
382 entries will effectively correspond to the last given tree. If 3 are given, a 3
383 way merge is performed.
384 """
385 out: List[BaseIndexEntry] = []
387 # One and two way is the same for us, as we don't have to handle an existing
388 # index, instrea
389 if len(tree_shas) in (1, 2):
390 for entry in traverse_tree_recursive(odb, tree_shas[-1], ""):
391 out.append(_tree_entry_to_baseindexentry(entry, 0))
392 # END for each entry
393 return out
394 # END handle single tree
396 if len(tree_shas) > 3:
397 raise ValueError("Cannot handle %i trees at once" % len(tree_shas))
399 # Three trees.
400 for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""):
401 if base is not None:
402 # Base version exists.
403 if ours is not None:
404 # Ours exists.
405 if theirs is not None:
406 # It exists in all branches. Ff it was changed in both
407 # its a conflict. Otherwise, we take the changed version.
408 # This should be the most common branch, so it comes first.
409 if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or (
410 base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]
411 ):
412 # Changed by both.
413 out.append(_tree_entry_to_baseindexentry(base, 1))
414 out.append(_tree_entry_to_baseindexentry(ours, 2))
415 out.append(_tree_entry_to_baseindexentry(theirs, 3))
416 elif base[0] != ours[0] or base[1] != ours[1]:
417 # Only we changed it.
418 out.append(_tree_entry_to_baseindexentry(ours, 0))
419 else:
420 # Either nobody changed it, or they did. In either
421 # case, use theirs.
422 out.append(_tree_entry_to_baseindexentry(theirs, 0))
423 # END handle modification
424 else:
425 if ours[0] != base[0] or ours[1] != base[1]:
426 # They deleted it, we changed it, conflict.
427 out.append(_tree_entry_to_baseindexentry(base, 1))
428 out.append(_tree_entry_to_baseindexentry(ours, 2))
429 # else:
430 # # We didn't change it, ignore.
431 # pass
432 # END handle our change
433 # END handle theirs
434 else:
435 if theirs is None:
436 # Deleted in both, its fine - it's out.
437 pass
438 else:
439 if theirs[0] != base[0] or theirs[1] != base[1]:
440 # Deleted in ours, changed theirs, conflict.
441 out.append(_tree_entry_to_baseindexentry(base, 1))
442 out.append(_tree_entry_to_baseindexentry(theirs, 3))
443 # END theirs changed
444 # else:
445 # # Theirs didn't change.
446 # pass
447 # END handle theirs
448 # END handle ours
449 else:
450 # All three can't be None.
451 if ours is None:
452 # Added in their branch.
453 assert theirs is not None
454 out.append(_tree_entry_to_baseindexentry(theirs, 0))
455 elif theirs is None:
456 # Added in our branch.
457 out.append(_tree_entry_to_baseindexentry(ours, 0))
458 else:
459 # Both have it, except for the base, see whether it changed.
460 if ours[0] != theirs[0] or ours[1] != theirs[1]:
461 out.append(_tree_entry_to_baseindexentry(ours, 2))
462 out.append(_tree_entry_to_baseindexentry(theirs, 3))
463 else:
464 # It was added the same in both.
465 out.append(_tree_entry_to_baseindexentry(ours, 0))
466 # END handle two items
467 # END handle heads
468 # END handle base exists
469 # END for each entries tuple
471 return out