Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/index/fun.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This module is part of GitPython and is released under the
2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
4"""Standalone functions to accompany the index implementation and make it more
5versatile."""
7__all__ = [
8 "write_cache",
9 "read_cache",
10 "write_tree_from_cache",
11 "entry_key",
12 "stat_mode_to_index_mode",
13 "S_IFGITLINK",
14 "run_commit_hook",
15 "hook_path",
16]
18from io import BytesIO
19import os
20import os.path as osp
21from pathlib import Path
22from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_ISDIR, S_ISLNK, S_IXUSR
23import subprocess
24import sys
26from gitdb.base import IStream
27from gitdb.typ import str_tree_type
29from git.cmd import handle_process_output, safer_popen
30from git.compat import defenc, force_bytes, force_text, safe_decode
31from git.exc import HookExecutionError, UnmergedEntriesError
32from git.objects.fun import (
33 traverse_tree_recursive,
34 traverse_trees_recursive,
35 tree_to_stream,
36)
37from git.util import IndexFileSHA1Writer, finalize_process
39from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT
40from .util import pack, unpack
42# typing -----------------------------------------------------------------------------
44from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast
46from git.types import PathLike
48if TYPE_CHECKING:
49 from git.db import GitCmdObjectDB
50 from git.objects.tree import TreeCacheTup
52 from .base import IndexFile
54# ------------------------------------------------------------------------------------
56S_IFGITLINK = S_IFLNK | S_IFDIR
57"""Flags for a submodule."""
59CE_NAMEMASK_INV = ~CE_NAMEMASK
62def hook_path(name: str, git_dir: PathLike) -> str:
63 """:return: path to the given named hook in the given git repository directory"""
64 return osp.join(git_dir, "hooks", name)
67def _has_file_extension(path: str) -> str:
68 return osp.splitext(path)[1]
71def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
72 """Run the commit hook of the given name. Silently ignore hooks that do not exist.
74 :param name:
75 Name of hook, like ``pre-commit``.
77 :param index:
78 :class:`~git.index.base.IndexFile` instance.
80 :param args:
81 Arguments passed to hook file.
83 :raise git.exc.HookExecutionError:
84 """
85 hp = hook_path(name, index.repo.git_dir)
86 if not os.access(hp, os.X_OK):
87 return
89 env = os.environ.copy()
90 env["GIT_INDEX_FILE"] = safe_decode(str(index.path))
91 env["GIT_EDITOR"] = ":"
92 cmd = [hp]
93 try:
94 if sys.platform == "win32" and not _has_file_extension(hp):
95 # Windows only uses extensions to determine how to open files
96 # (doesn't understand shebangs). Try using bash to run the hook.
97 relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
98 cmd = ["bash.exe", relative_hp]
100 process = safer_popen(
101 cmd + list(args),
102 env=env,
103 stdout=subprocess.PIPE,
104 stderr=subprocess.PIPE,
105 cwd=index.repo.working_dir,
106 )
107 except Exception as ex:
108 raise HookExecutionError(hp, ex) from ex
109 else:
110 stdout_list: List[str] = []
111 stderr_list: List[str] = []
112 handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process)
113 stdout = "".join(stdout_list)
114 stderr = "".join(stderr_list)
115 if process.returncode != 0:
116 stdout = force_text(stdout, defenc)
117 stderr = force_text(stderr, defenc)
118 raise HookExecutionError(hp, process.returncode, stderr, stdout)
119 # END handle return code
122def stat_mode_to_index_mode(mode: int) -> int:
123 """Convert the given mode from a stat call to the corresponding index mode and
124 return it."""
125 if S_ISLNK(mode): # symlinks
126 return S_IFLNK
127 if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules
128 return S_IFGITLINK
129 return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit
132def write_cache(
133 entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]],
134 stream: IO[bytes],
135 extension_data: Union[None, bytes] = None,
136 ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer,
137) -> None:
138 """Write the cache represented by entries to a stream.
140 :param entries:
141 **Sorted** list of entries.
143 :param stream:
144 Stream to wrap into the AdapterStreamCls - it is used for final output.
146 :param ShaStreamCls:
147 Type to use when writing to the stream. It produces a sha while writing to it,
148 before the data is passed on to the wrapped stream.
150 :param extension_data:
151 Any kind of data to write as a trailer, it must begin a 4 byte identifier,
152 followed by its size (4 bytes).
153 """
154 # Wrap the stream into a compatible writer.
155 stream_sha = ShaStreamCls(stream)
157 tell = stream_sha.tell
158 write = stream_sha.write
160 # Header
161 version = 2
162 write(b"DIRC")
163 write(pack(">LL", version, len(entries)))
165 # Body
166 for entry in entries:
167 beginoffset = tell()
168 write(entry.ctime_bytes) # ctime
169 write(entry.mtime_bytes) # mtime
170 path_str = str(entry.path)
171 path: bytes = force_bytes(path_str, encoding=defenc)
172 plen = len(path) & CE_NAMEMASK # Path length
173 assert plen == len(path), "Path %s too long to fit into index" % entry.path
174 flags = plen | (entry.flags & CE_NAMEMASK_INV) # Clear possible previous values.
175 write(
176 pack(
177 ">LLLLLL20sH",
178 entry.dev,
179 entry.inode,
180 entry.mode,
181 entry.uid,
182 entry.gid,
183 entry.size,
184 entry.binsha,
185 flags,
186 )
187 )
188 write(path)
189 real_size = (tell() - beginoffset + 8) & ~7
190 write(b"\0" * ((beginoffset + real_size) - tell()))
191 # END for each entry
193 # Write previously cached extensions data.
194 if extension_data is not None:
195 stream_sha.write(extension_data)
197 # Write the sha over the content.
198 stream_sha.write_sha()
201def read_header(stream: IO[bytes]) -> Tuple[int, int]:
202 """Return tuple(version_long, num_entries) from the given stream."""
203 type_id = stream.read(4)
204 if type_id != b"DIRC":
205 raise AssertionError("Invalid index file header: %r" % type_id)
206 unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2)))
207 version, num_entries = unpacked
209 # TODO: Handle version 3: extended data, see read-cache.c.
210 assert version in (1, 2)
211 return version, num_entries
214def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]:
215 """
216 :return:
217 Key suitable to be used for the
218 :attr:`index.entries <git.index.base.IndexFile.entries>` dictionary.
220 :param entry:
221 One instance of type BaseIndexEntry or the path and the stage.
222 """
224 # def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]:
225 # return isinstance(entry_key, tuple) and len(entry_key) == 2
227 if len(entry) == 1:
228 entry_first = entry[0]
229 assert isinstance(entry_first, BaseIndexEntry)
230 return (entry_first.path, entry_first.stage)
231 else:
232 # assert is_entry_key_tup(entry)
233 entry = cast(Tuple[PathLike, int], entry)
234 return entry
235 # END handle entry
238def read_cache(
239 stream: IO[bytes],
240) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]:
241 """Read a cache file from the given stream.
243 :return:
244 tuple(version, entries_dict, extension_data, content_sha)
246 * *version* is the integer version number.
247 * *entries_dict* is a dictionary which maps IndexEntry instances to a path at a
248 stage.
249 * *extension_data* is ``""`` or 4 bytes of type + 4 bytes of size + size bytes.
250 * *content_sha* is a 20 byte sha on all cache file contents.
251 """
252 version, num_entries = read_header(stream)
253 count = 0
254 entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {}
256 read = stream.read
257 tell = stream.tell
258 while count < num_entries:
259 beginoffset = tell()
260 ctime = unpack(">8s", read(8))[0]
261 mtime = unpack(">8s", read(8))[0]
262 (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2))
263 path_size = flags & CE_NAMEMASK
264 path = read(path_size).decode(defenc)
266 real_size = (tell() - beginoffset + 8) & ~7
267 read((beginoffset + real_size) - tell())
268 entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size))
269 # entry_key would be the method to use, but we save the effort.
270 entries[(path, entry.stage)] = entry
271 count += 1
272 # END for each entry
274 # The footer contains extension data and a sha on the content so far.
275 # Keep the extension footer,and verify we have a sha in the end.
276 # Extension data format is:
277 # 4 bytes ID
278 # 4 bytes length of chunk
279 # Repeated 0 - N times
280 extension_data = stream.read(~0)
281 assert len(extension_data) > 19, (
282 "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data)
283 )
285 content_sha = extension_data[-20:]
287 # Truncate the sha in the end as we will dynamically create it anyway.
288 extension_data = extension_data[:-20]
290 return (version, entries, extension_data, content_sha)
293def write_tree_from_cache(
294 entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0
295) -> Tuple[bytes, List["TreeCacheTup"]]:
296 R"""Create a tree from the given sorted list of entries and put the respective
297 trees into the given object database.
299 :param entries:
300 **Sorted** list of :class:`~git.index.typ.IndexEntry`\s.
302 :param odb:
303 Object database to store the trees in.
305 :param si:
306 Start index at which we should start creating subtrees.
308 :param sl:
309 Slice indicating the range we should process on the entries list.
311 :return:
312 tuple(binsha, list(tree_entry, ...))
314 A tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name.
315 """
316 tree_items: List["TreeCacheTup"] = []
318 ci = sl.start
319 end = sl.stop
320 while ci < end:
321 entry = entries[ci]
322 if entry.stage != 0:
323 raise UnmergedEntriesError(entry)
324 # END abort on unmerged
325 ci += 1
326 rbound = entry.path.find("/", si)
327 if rbound == -1:
328 # It's not a tree.
329 tree_items.append((entry.binsha, entry.mode, entry.path[si:]))
330 else:
331 # Find common base range.
332 base = entry.path[si:rbound]
333 xi = ci
334 while xi < end:
335 oentry = entries[xi]
336 orbound = oentry.path.find("/", si)
337 if orbound == -1 or oentry.path[si:orbound] != base:
338 break
339 # END abort on base mismatch
340 xi += 1
341 # END find common base
343 # Enter recursion.
344 # ci - 1 as we want to count our current item as well.
345 sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
346 tree_items.append((sha, S_IFDIR, base))
348 # Skip ahead.
349 ci = xi
350 # END handle bounds
351 # END for each entry
353 # Finally create the tree.
354 sio = BytesIO()
355 tree_to_stream(tree_items, sio.write) # Writes to stream as bytes, but doesn't change tree_items.
356 sio.seek(0)
358 istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
359 return (istream.binsha, tree_items)
362def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry:
363 return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))
366def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
367 R"""
368 :return:
369 List of :class:`~git.index.typ.BaseIndexEntry`\s representing the aggressive
370 merge of the given trees. All valid entries are on stage 0, whereas the
371 conflicting ones are left on stage 1, 2 or 3, whereas stage 1 corresponds to the
372 common ancestor tree, 2 to our tree and 3 to 'their' tree.
374 :param tree_shas:
375 1, 2 or 3 trees as identified by their binary 20 byte shas. If 1 or two, the
376 entries will effectively correspond to the last given tree. If 3 are given, a 3
377 way merge is performed.
378 """
379 out: List[BaseIndexEntry] = []
381 # One and two way is the same for us, as we don't have to handle an existing
382 # index, instrea
383 if len(tree_shas) in (1, 2):
384 for entry in traverse_tree_recursive(odb, tree_shas[-1], ""):
385 out.append(_tree_entry_to_baseindexentry(entry, 0))
386 # END for each entry
387 return out
388 # END handle single tree
390 if len(tree_shas) > 3:
391 raise ValueError("Cannot handle %i trees at once" % len(tree_shas))
393 # Three trees.
394 for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""):
395 if base is not None:
396 # Base version exists.
397 if ours is not None:
398 # Ours exists.
399 if theirs is not None:
400 # It exists in all branches. Ff it was changed in both
401 # its a conflict. Otherwise, we take the changed version.
402 # This should be the most common branch, so it comes first.
403 if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or (
404 base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]
405 ):
406 # Changed by both.
407 out.append(_tree_entry_to_baseindexentry(base, 1))
408 out.append(_tree_entry_to_baseindexentry(ours, 2))
409 out.append(_tree_entry_to_baseindexentry(theirs, 3))
410 elif base[0] != ours[0] or base[1] != ours[1]:
411 # Only we changed it.
412 out.append(_tree_entry_to_baseindexentry(ours, 0))
413 else:
414 # Either nobody changed it, or they did. In either
415 # case, use theirs.
416 out.append(_tree_entry_to_baseindexentry(theirs, 0))
417 # END handle modification
418 else:
419 if ours[0] != base[0] or ours[1] != base[1]:
420 # They deleted it, we changed it, conflict.
421 out.append(_tree_entry_to_baseindexentry(base, 1))
422 out.append(_tree_entry_to_baseindexentry(ours, 2))
423 # else:
424 # # We didn't change it, ignore.
425 # pass
426 # END handle our change
427 # END handle theirs
428 else:
429 if theirs is None:
430 # Deleted in both, its fine - it's out.
431 pass
432 else:
433 if theirs[0] != base[0] or theirs[1] != base[1]:
434 # Deleted in ours, changed theirs, conflict.
435 out.append(_tree_entry_to_baseindexentry(base, 1))
436 out.append(_tree_entry_to_baseindexentry(theirs, 3))
437 # END theirs changed
438 # else:
439 # # Theirs didn't change.
440 # pass
441 # END handle theirs
442 # END handle ours
443 else:
444 # All three can't be None.
445 if ours is None:
446 # Added in their branch.
447 assert theirs is not None
448 out.append(_tree_entry_to_baseindexentry(theirs, 0))
449 elif theirs is None:
450 # Added in our branch.
451 out.append(_tree_entry_to_baseindexentry(ours, 0))
452 else:
453 # Both have it, except for the base, see whether it changed.
454 if ours[0] != theirs[0] or ours[1] != theirs[1]:
455 out.append(_tree_entry_to_baseindexentry(ours, 2))
456 out.append(_tree_entry_to_baseindexentry(theirs, 3))
457 else:
458 # It was added the same in both.
459 out.append(_tree_entry_to_baseindexentry(ours, 0))
460 # END handle two items
461 # END handle heads
462 # END handle base exists
463 # END for each entries tuple
465 return out