Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/objects/tree.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
2#
3# This module is part of GitPython and is released under the
4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
6__all__ = ["TreeModifier", "Tree"]
8import sys
10import git.diff as git_diff
11from git.util import IterableList, join_path, to_bin_sha
13from . import util
14from .base import IndexObjUnion, IndexObject
15from .blob import Blob
16from .fun import tree_entries_from_data, tree_to_stream
17from .submodule.base import Submodule
19# typing -------------------------------------------------
21from typing import (
22 Any,
23 Callable,
24 Dict,
25 Iterable,
26 Iterator,
27 List,
28 Tuple,
29 TYPE_CHECKING,
30 Type,
31 Union,
32 cast,
33)
35if sys.version_info >= (3, 8):
36 from typing import Literal
37else:
38 from typing_extensions import Literal
40from git.types import PathLike
42if TYPE_CHECKING:
43 from io import BytesIO
45 from git.repo import Repo
47TreeCacheTup = Tuple[bytes, int, str]
49TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
51# --------------------------------------------------------
54def cmp(a: str, b: str) -> int:
55 return (a > b) - (a < b)
58class TreeModifier:
59 """A utility class providing methods to alter the underlying cache in a list-like
60 fashion.
62 Once all adjustments are complete, the :attr:`_cache`, which really is a reference
63 to the cache of a tree, will be sorted. This ensures it will be in a serializable
64 state.
65 """
67 __slots__ = ("_cache",)
69 def __init__(self, cache: List[TreeCacheTup]) -> None:
70 self._cache = cache
72 def _index_by_name(self, name: str) -> int:
73 """:return: index of an item with name, or -1 if not found"""
74 for i, t in enumerate(self._cache):
75 if t[2] == name:
76 return i
77 # END found item
78 # END for each item in cache
79 return -1
81 # { Interface
82 def set_done(self) -> "TreeModifier":
83 """Call this method once you are done modifying the tree information.
85 This may be called several times, but be aware that each call will cause a sort
86 operation.
88 :return:
89 self
90 """
91 self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2])
92 return self
94 # } END interface
96 # { Mutators
97 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
98 """Add the given item to the tree.
100 If an item with the given name already exists, nothing will be done, but a
101 :exc:`ValueError` will be raised if the sha and mode of the existing item do not
102 match the one you add, unless `force` is ``True``.
104 :param sha:
105 The 20 or 40 byte sha of the item to add.
107 :param mode:
108 :class:`int` representing the stat-compatible mode of the item.
110 :param force:
111 If ``True``, an item with your name and information will overwrite any
112 existing item with the same name, no matter which information it has.
114 :return:
115 self
116 """
117 if "/" in name:
118 raise ValueError("Name must not contain '/' characters")
119 if (mode >> 12) not in Tree._map_id_to_type:
120 raise ValueError("Invalid object type according to mode %o" % mode)
122 sha = to_bin_sha(sha)
123 index = self._index_by_name(name)
125 item = (sha, mode, name)
127 if index == -1:
128 self._cache.append(item)
129 else:
130 if force:
131 self._cache[index] = item
132 else:
133 ex_item = self._cache[index]
134 if ex_item[0] != sha or ex_item[1] != mode:
135 raise ValueError("Item %r existed with different properties" % name)
136 # END handle mismatch
137 # END handle force
138 # END handle name exists
139 return self
141 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None:
142 """Add the given item to the tree. Its correctness is assumed, so it is the
143 caller's responsibility to ensure that the input is correct.
145 For more information on the parameters, see :meth:`add`.
147 :param binsha:
148 20 byte binary sha.
149 """
150 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
151 tree_cache = (binsha, mode, name)
153 self._cache.append(tree_cache)
155 def __delitem__(self, name: str) -> None:
156 """Delete an item with the given name if it exists."""
157 index = self._index_by_name(name)
158 if index > -1:
159 del self._cache[index]
161 # } END mutators
164class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
165 R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and
166 other :class:`Tree`\s.
168 See :manpage:`gitglossary(7)` on "tree object":
169 https://git-scm.com/docs/gitglossary#def_tree_object
171 Subscripting is supported, as with a list or dict:
173 * Access a specific blob using the ``tree["filename"]`` notation.
174 * You may likewise access by index, like ``blob = tree[0]``.
175 """
177 type: Literal["tree"] = "tree"
179 __slots__ = ("_cache",)
181 # Actual integer IDs for comparison.
182 commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link.
183 blob_id = 0o10
184 symlink_id = 0o12
185 tree_id = 0o04
187 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
188 commit_id: Submodule,
189 blob_id: Blob,
190 symlink_id: Blob,
191 # Tree ID added once Tree is defined.
192 }
194 def __init__(
195 self,
196 repo: "Repo",
197 binsha: bytes,
198 mode: int = tree_id << 12,
199 path: Union[PathLike, None] = None,
200 ):
201 super().__init__(repo, binsha, mode, path)
203 @classmethod
204 def _get_intermediate_items(
205 cls,
206 index_object: IndexObjUnion,
207 ) -> Union[Tuple["Tree", ...], Tuple[()]]:
208 if index_object.type == "tree":
209 return tuple(index_object._iter_convert_to_object(index_object._cache))
210 return ()
212 def _set_cache_(self, attr: str) -> None:
213 if attr == "_cache":
214 # Set the data when we need it.
215 ostream = self.repo.odb.stream(self.binsha)
216 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
217 else:
218 super()._set_cache_(attr)
219 # END handle attribute
221 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
222 """Iterable yields tuples of (binsha, mode, name), which will be converted to
223 the respective object representation.
224 """
225 for binsha, mode, name in iterable:
226 path = join_path(self.path, name)
227 try:
228 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
229 except KeyError as e:
230 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
231 # END for each item
233 def join(self, file: str) -> IndexObjUnion:
234 """Find the named object in this tree's contents.
236 :return:
237 :class:`~git.objects.blob.Blob`, :class:`Tree`, or
238 :class:`~git.objects.submodule.base.Submodule`
240 :raise KeyError:
241 If the given file or tree does not exist in this tree.
242 """
243 msg = "Blob or Tree named %r not found"
244 if "/" in file:
245 tree = self
246 item = self
247 tokens = file.split("/")
248 for i, token in enumerate(tokens):
249 item = tree[token]
250 if item.type == "tree":
251 tree = item
252 else:
253 # Safety assertion - blobs are at the end of the path.
254 if i != len(tokens) - 1:
255 raise KeyError(msg % file)
256 return item
257 # END handle item type
258 # END for each token of split path
259 if item == self:
260 raise KeyError(msg % file)
261 return item
262 else:
263 for info in self._cache:
264 if info[2] == file: # [2] == name
265 return self._map_id_to_type[info[1] >> 12](
266 self.repo, info[0], info[1], join_path(self.path, info[2])
267 )
268 # END for each obj
269 raise KeyError(msg % file)
270 # END handle long paths
272 def __truediv__(self, file: str) -> IndexObjUnion:
273 """The ``/`` operator is another syntax for joining.
275 See :meth:`join` for details.
276 """
277 return self.join(file)
279 @property
280 def trees(self) -> List["Tree"]:
281 """:return: list(Tree, ...) List of trees directly below this tree"""
282 return [i for i in self if i.type == "tree"]
284 @property
285 def blobs(self) -> List[Blob]:
286 """:return: list(Blob, ...) List of blobs directly below this tree"""
287 return [i for i in self if i.type == "blob"]
289 @property
290 def cache(self) -> TreeModifier:
291 """
292 :return:
293 An object allowing modification of the internal cache. This can be used to
294 change the tree's contents. When done, make sure you call
295 :meth:`~TreeModifier.set_done` on the tree modifier, or serialization
296 behaviour will be incorrect.
298 :note:
299 See :class:`TreeModifier` for more information on how to alter the cache.
300 """
301 return TreeModifier(self._cache)
303 def traverse(
304 self,
305 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
306 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
307 depth: int = -1,
308 branch_first: bool = True,
309 visit_once: bool = False,
310 ignore_self: int = 1,
311 as_edge: bool = False,
312 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
313 """For documentation, see
314 `Traversable._traverse() <git.objects.util.Traversable._traverse>`.
316 Trees are set to ``visit_once = False`` to gain more performance in the
317 traversal.
318 """
320 # # To typecheck instead of using cast.
321 # import itertools
322 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
323 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
325 # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
326 # ret_tup = itertools.tee(ret, 2)
327 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
328 # return ret_tup[0]
330 return cast(
331 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
332 super()._traverse(
333 predicate, # type: ignore[arg-type]
334 prune, # type: ignore[arg-type]
335 depth,
336 branch_first,
337 visit_once,
338 ignore_self,
339 ),
340 )
342 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
343 """
344 :return:
345 :class:`~git.util.IterableList` with the results of the traversal as
346 produced by :meth:`traverse`
348 Tree -> IterableList[Union[Submodule, Tree, Blob]]
349 """
350 return super()._list_traverse(*args, **kwargs)
352 # List protocol
354 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
355 return list(self._iter_convert_to_object(self._cache[i:j]))
357 def __iter__(self) -> Iterator[IndexObjUnion]:
358 return self._iter_convert_to_object(self._cache)
360 def __len__(self) -> int:
361 return len(self._cache)
363 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
364 if isinstance(item, int):
365 info = self._cache[item]
366 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
368 if isinstance(item, str):
369 # compatibility
370 return self.join(item)
371 # END index is basestring
373 raise TypeError("Invalid index type: %r" % item)
375 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
376 if isinstance(item, IndexObject):
377 for info in self._cache:
378 if item.binsha == info[0]:
379 return True
380 # END compare sha
381 # END for each entry
382 # END handle item is index object
383 # compatibility
385 # Treat item as repo-relative path.
386 else:
387 path = self.path
388 for info in self._cache:
389 if item == join_path(path, info[2]):
390 return True
391 # END for each item
392 return False
394 def __reversed__(self) -> Iterator[IndexObjUnion]:
395 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload]
397 def _serialize(self, stream: "BytesIO") -> "Tree":
398 """Serialize this tree into the stream. Assumes sorted tree data.
400 :note:
401 We will assume our tree data to be in a sorted state. If this is not the
402 case, serialization will not generate a correct tree representation as these
403 are assumed to be sorted by algorithms.
404 """
405 tree_to_stream(self._cache, stream.write)
406 return self
408 def _deserialize(self, stream: "BytesIO") -> "Tree":
409 self._cache = tree_entries_from_data(stream.read())
410 return self
413# END tree
415# Finalize map definition.
416Tree._map_id_to_type[Tree.tree_id] = Tree