Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/objects/tree.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
2#
3# This module is part of GitPython and is released under the
4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
6__all__ = ["TreeModifier", "Tree"]
8import sys
10import git.diff as git_diff
11from git.util import IterableList, join_path, to_bin_sha
13from . import util
14from .base import IndexObjUnion, IndexObject
15from .blob import Blob
16from .fun import tree_entries_from_data, tree_to_stream
17from .submodule.base import Submodule
19# typing -------------------------------------------------
21from typing import (
22 Any,
23 Callable,
24 Dict,
25 Iterable,
26 Iterator,
27 List,
28 Tuple,
29 TYPE_CHECKING,
30 Type,
31 Union,
32 cast,
33)
35if sys.version_info >= (3, 8):
36 from typing import Literal
37else:
38 from typing_extensions import Literal
40from git.types import PathLike
42if TYPE_CHECKING:
43 from io import BytesIO
45 from git.repo import Repo
47TreeCacheTup = Tuple[bytes, int, str]
49TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
51# --------------------------------------------------------
53cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b)
56class TreeModifier:
57 """A utility class providing methods to alter the underlying cache in a list-like
58 fashion.
60 Once all adjustments are complete, the :attr:`_cache`, which really is a reference
61 to the cache of a tree, will be sorted. This ensures it will be in a serializable
62 state.
63 """
65 __slots__ = ("_cache",)
67 def __init__(self, cache: List[TreeCacheTup]) -> None:
68 self._cache = cache
70 def _index_by_name(self, name: str) -> int:
71 """:return: index of an item with name, or -1 if not found"""
72 for i, t in enumerate(self._cache):
73 if t[2] == name:
74 return i
75 # END found item
76 # END for each item in cache
77 return -1
79 # { Interface
80 def set_done(self) -> "TreeModifier":
81 """Call this method once you are done modifying the tree information.
83 This may be called several times, but be aware that each call will cause a sort
84 operation.
86 :return:
87 self
88 """
89 self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2])
90 return self
92 # } END interface
94 # { Mutators
95 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
96 """Add the given item to the tree.
98 If an item with the given name already exists, nothing will be done, but a
99 :exc:`ValueError` will be raised if the sha and mode of the existing item do not
100 match the one you add, unless `force` is ``True``.
102 :param sha:
103 The 20 or 40 byte sha of the item to add.
105 :param mode:
106 :class:`int` representing the stat-compatible mode of the item.
108 :param force:
109 If ``True``, an item with your name and information will overwrite any
110 existing item with the same name, no matter which information it has.
112 :return:
113 self
114 """
115 if "/" in name:
116 raise ValueError("Name must not contain '/' characters")
117 if (mode >> 12) not in Tree._map_id_to_type:
118 raise ValueError("Invalid object type according to mode %o" % mode)
120 sha = to_bin_sha(sha)
121 index = self._index_by_name(name)
123 item = (sha, mode, name)
125 if index == -1:
126 self._cache.append(item)
127 else:
128 if force:
129 self._cache[index] = item
130 else:
131 ex_item = self._cache[index]
132 if ex_item[0] != sha or ex_item[1] != mode:
133 raise ValueError("Item %r existed with different properties" % name)
134 # END handle mismatch
135 # END handle force
136 # END handle name exists
137 return self
139 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None:
140 """Add the given item to the tree. Its correctness is assumed, so it is the
141 caller's responsibility to ensure that the input is correct.
143 For more information on the parameters, see :meth:`add`.
145 :param binsha:
146 20 byte binary sha.
147 """
148 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
149 tree_cache = (binsha, mode, name)
151 self._cache.append(tree_cache)
153 def __delitem__(self, name: str) -> None:
154 """Delete an item with the given name if it exists."""
155 index = self._index_by_name(name)
156 if index > -1:
157 del self._cache[index]
159 # } END mutators
162class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
163 R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and
164 other :class:`Tree`\s.
166 See :manpage:`gitglossary(7)` on "tree object":
167 https://git-scm.com/docs/gitglossary#def_tree_object
169 Subscripting is supported, as with a list or dict:
171 * Access a specific blob using the ``tree["filename"]`` notation.
172 * You may likewise access by index, like ``blob = tree[0]``.
173 """
175 type: Literal["tree"] = "tree"
177 __slots__ = ("_cache",)
179 # Actual integer IDs for comparison.
180 commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link.
181 blob_id = 0o10
182 symlink_id = 0o12
183 tree_id = 0o04
185 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
186 commit_id: Submodule,
187 blob_id: Blob,
188 symlink_id: Blob,
189 # Tree ID added once Tree is defined.
190 }
192 def __init__(
193 self,
194 repo: "Repo",
195 binsha: bytes,
196 mode: int = tree_id << 12,
197 path: Union[PathLike, None] = None,
198 ):
199 super().__init__(repo, binsha, mode, path)
201 @classmethod
202 def _get_intermediate_items(
203 cls,
204 index_object: IndexObjUnion,
205 ) -> Union[Tuple["Tree", ...], Tuple[()]]:
206 if index_object.type == "tree":
207 return tuple(index_object._iter_convert_to_object(index_object._cache))
208 return ()
210 def _set_cache_(self, attr: str) -> None:
211 if attr == "_cache":
212 # Set the data when we need it.
213 ostream = self.repo.odb.stream(self.binsha)
214 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
215 else:
216 super()._set_cache_(attr)
217 # END handle attribute
219 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
220 """Iterable yields tuples of (binsha, mode, name), which will be converted to
221 the respective object representation.
222 """
223 for binsha, mode, name in iterable:
224 path = join_path(self.path, name)
225 try:
226 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
227 except KeyError as e:
228 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
229 # END for each item
231 def join(self, file: str) -> IndexObjUnion:
232 """Find the named object in this tree's contents.
234 :return:
235 :class:`~git.objects.blob.Blob`, :class:`Tree`, or
236 :class:`~git.objects.submodule.base.Submodule`
238 :raise KeyError:
239 If the given file or tree does not exist in this tree.
240 """
241 msg = "Blob or Tree named %r not found"
242 if "/" in file:
243 tree = self
244 item = self
245 tokens = file.split("/")
246 for i, token in enumerate(tokens):
247 item = tree[token]
248 if item.type == "tree":
249 tree = item
250 else:
251 # Safety assertion - blobs are at the end of the path.
252 if i != len(tokens) - 1:
253 raise KeyError(msg % file)
254 return item
255 # END handle item type
256 # END for each token of split path
257 if item == self:
258 raise KeyError(msg % file)
259 return item
260 else:
261 for info in self._cache:
262 if info[2] == file: # [2] == name
263 return self._map_id_to_type[info[1] >> 12](
264 self.repo, info[0], info[1], join_path(self.path, info[2])
265 )
266 # END for each obj
267 raise KeyError(msg % file)
268 # END handle long paths
270 def __truediv__(self, file: str) -> IndexObjUnion:
271 """The ``/`` operator is another syntax for joining.
273 See :meth:`join` for details.
274 """
275 return self.join(file)
277 @property
278 def trees(self) -> List["Tree"]:
279 """:return: list(Tree, ...) List of trees directly below this tree"""
280 return [i for i in self if i.type == "tree"]
282 @property
283 def blobs(self) -> List[Blob]:
284 """:return: list(Blob, ...) List of blobs directly below this tree"""
285 return [i for i in self if i.type == "blob"]
287 @property
288 def cache(self) -> TreeModifier:
289 """
290 :return:
291 An object allowing modification of the internal cache. This can be used to
292 change the tree's contents. When done, make sure you call
293 :meth:`~TreeModifier.set_done` on the tree modifier, or serialization
294 behaviour will be incorrect.
296 :note:
297 See :class:`TreeModifier` for more information on how to alter the cache.
298 """
299 return TreeModifier(self._cache)
301 def traverse(
302 self,
303 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
304 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
305 depth: int = -1,
306 branch_first: bool = True,
307 visit_once: bool = False,
308 ignore_self: int = 1,
309 as_edge: bool = False,
310 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
311 """For documentation, see
312 `Traversable._traverse() <git.objects.util.Traversable._traverse>`.
314 Trees are set to ``visit_once = False`` to gain more performance in the
315 traversal.
316 """
318 # # To typecheck instead of using cast.
319 # import itertools
320 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
321 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
323 # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
324 # ret_tup = itertools.tee(ret, 2)
325 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
326 # return ret_tup[0]
328 return cast(
329 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
330 super()._traverse(
331 predicate, # type: ignore[arg-type]
332 prune, # type: ignore[arg-type]
333 depth,
334 branch_first,
335 visit_once,
336 ignore_self,
337 ),
338 )
340 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
341 """
342 :return:
343 :class:`~git.util.IterableList` with the results of the traversal as
344 produced by :meth:`traverse`
346 Tree -> IterableList[Union[Submodule, Tree, Blob]]
347 """
348 return super()._list_traverse(*args, **kwargs)
350 # List protocol
352 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
353 return list(self._iter_convert_to_object(self._cache[i:j]))
355 def __iter__(self) -> Iterator[IndexObjUnion]:
356 return self._iter_convert_to_object(self._cache)
358 def __len__(self) -> int:
359 return len(self._cache)
361 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
362 if isinstance(item, int):
363 info = self._cache[item]
364 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
366 if isinstance(item, str):
367 # compatibility
368 return self.join(item)
369 # END index is basestring
371 raise TypeError("Invalid index type: %r" % item)
373 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
374 if isinstance(item, IndexObject):
375 for info in self._cache:
376 if item.binsha == info[0]:
377 return True
378 # END compare sha
379 # END for each entry
380 # END handle item is index object
381 # compatibility
383 # Treat item as repo-relative path.
384 else:
385 path = self.path
386 for info in self._cache:
387 if item == join_path(path, info[2]):
388 return True
389 # END for each item
390 return False
392 def __reversed__(self) -> Iterator[IndexObjUnion]:
393 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload]
395 def _serialize(self, stream: "BytesIO") -> "Tree":
396 """Serialize this tree into the stream. Assumes sorted tree data.
398 :note:
399 We will assume our tree data to be in a sorted state. If this is not the
400 case, serialization will not generate a correct tree representation as these
401 are assumed to be sorted by algorithms.
402 """
403 tree_to_stream(self._cache, stream.write)
404 return self
406 def _deserialize(self, stream: "BytesIO") -> "Tree":
407 self._cache = tree_entries_from_data(stream.read())
408 return self
411# END tree
413# Finalize map definition.
414Tree._map_id_to_type[Tree.tree_id] = Tree