Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/objects/tree.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
2#
3# This module is part of GitPython and is released under the
4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
6__all__ = ["TreeModifier", "Tree"]
8import os
9import sys
11import git.diff as git_diff
12from git.util import IterableList, join_path, to_bin_sha
14from . import util
15from .base import IndexObjUnion, IndexObject
16from .blob import Blob
17from .fun import tree_entries_from_data, tree_to_stream
18from .submodule.base import Submodule
20# typing -------------------------------------------------
22from typing import (
23 Any,
24 Callable,
25 Dict,
26 Iterable,
27 Iterator,
28 List,
29 Tuple,
30 TYPE_CHECKING,
31 Type,
32 Union,
33 cast,
34)
36if sys.version_info >= (3, 8):
37 from typing import Literal
38else:
39 from typing_extensions import Literal
41from git.types import PathLike
43if TYPE_CHECKING:
44 from io import BytesIO
46 from git.repo import Repo
48TreeCacheTup = Tuple[bytes, int, str]
50TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
52# --------------------------------------------------------
55def cmp(a: str, b: str) -> int:
56 return (a > b) - (a < b)
59class TreeModifier:
60 """A utility class providing methods to alter the underlying cache in a list-like
61 fashion.
63 Once all adjustments are complete, the :attr:`_cache`, which really is a reference
64 to the cache of a tree, will be sorted. This ensures it will be in a serializable
65 state.
66 """
68 __slots__ = ("_cache",)
70 def __init__(self, cache: List[TreeCacheTup]) -> None:
71 self._cache = cache
73 def _index_by_name(self, name: str) -> int:
74 """:return: index of an item with name, or -1 if not found"""
75 for i, t in enumerate(self._cache):
76 if t[2] == name:
77 return i
78 # END found item
79 # END for each item in cache
80 return -1
82 # { Interface
83 def set_done(self) -> "TreeModifier":
84 """Call this method once you are done modifying the tree information.
86 This may be called several times, but be aware that each call will cause a sort
87 operation.
89 :return:
90 self
91 """
92 self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2])
93 return self
95 # } END interface
97 # { Mutators
98 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
99 """Add the given item to the tree.
101 If an item with the given name already exists, nothing will be done, but a
102 :exc:`ValueError` will be raised if the sha and mode of the existing item do not
103 match the one you add, unless `force` is ``True``.
105 :param sha:
106 The 20 or 40 byte sha of the item to add.
108 :param mode:
109 :class:`int` representing the stat-compatible mode of the item.
111 :param force:
112 If ``True``, an item with your name and information will overwrite any
113 existing item with the same name, no matter which information it has.
115 :return:
116 self
117 """
118 if "/" in name:
119 raise ValueError("Name must not contain '/' characters")
120 if (mode >> 12) not in Tree._map_id_to_type:
121 raise ValueError("Invalid object type according to mode %o" % mode)
123 sha = to_bin_sha(sha)
124 index = self._index_by_name(name)
126 item = (sha, mode, name)
128 if index == -1:
129 self._cache.append(item)
130 else:
131 if force:
132 self._cache[index] = item
133 else:
134 ex_item = self._cache[index]
135 if ex_item[0] != sha or ex_item[1] != mode:
136 raise ValueError("Item %r existed with different properties" % name)
137 # END handle mismatch
138 # END handle force
139 # END handle name exists
140 return self
142 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None:
143 """Add the given item to the tree. Its correctness is assumed, so it is the
144 caller's responsibility to ensure that the input is correct.
146 For more information on the parameters, see :meth:`add`.
148 :param binsha:
149 20 byte binary sha.
150 """
151 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
152 tree_cache = (binsha, mode, name)
154 self._cache.append(tree_cache)
156 def __delitem__(self, name: str) -> None:
157 """Delete an item with the given name if it exists."""
158 index = self._index_by_name(name)
159 if index > -1:
160 del self._cache[index]
162 # } END mutators
165class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
166 R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and
167 other :class:`Tree`\s.
169 See :manpage:`gitglossary(7)` on "tree object":
170 https://git-scm.com/docs/gitglossary#def_tree_object
172 Subscripting is supported, as with a list or dict:
174 * Access a specific blob using the ``tree["filename"]`` notation.
175 * You may likewise access by index, like ``blob = tree[0]``.
176 """
178 type: Literal["tree"] = "tree"
180 __slots__ = ("_cache",)
182 # Actual integer IDs for comparison.
183 commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link.
184 blob_id = 0o10
185 symlink_id = 0o12
186 tree_id = 0o04
188 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
189 commit_id: Submodule,
190 blob_id: Blob,
191 symlink_id: Blob,
192 # Tree ID added once Tree is defined.
193 }
195 def __init__(
196 self,
197 repo: "Repo",
198 binsha: bytes,
199 mode: int = tree_id << 12,
200 path: Union[PathLike, None] = None,
201 ):
202 super().__init__(repo, binsha, mode, path)
204 @classmethod
205 def _get_intermediate_items(
206 cls,
207 index_object: IndexObjUnion,
208 ) -> Union[Tuple["Tree", ...], Tuple[()]]:
209 if index_object.type == "tree":
210 return tuple(index_object._iter_convert_to_object(index_object._cache))
211 return ()
213 def _set_cache_(self, attr: str) -> None:
214 if attr == "_cache":
215 # Set the data when we need it.
216 ostream = self.repo.odb.stream(self.binsha)
217 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
218 else:
219 super()._set_cache_(attr)
220 # END handle attribute
222 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
223 """Iterable yields tuples of (binsha, mode, name), which will be converted to
224 the respective object representation.
225 """
226 for binsha, mode, name in iterable:
227 path = join_path(self.path, name)
228 try:
229 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
230 except KeyError as e:
231 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
232 # END for each item
234 def join(self, file: PathLike) -> IndexObjUnion:
235 """Find the named object in this tree's contents.
237 :return:
238 :class:`~git.objects.blob.Blob`, :class:`Tree`, or
239 :class:`~git.objects.submodule.base.Submodule`
241 :raise KeyError:
242 If the given file or tree does not exist in this tree.
243 """
244 msg = "Blob or Tree named %r not found"
245 file = os.fspath(file)
246 if "/" in file:
247 tree = self
248 item = self
249 tokens = file.split("/")
250 for i, token in enumerate(tokens):
251 item = tree[token]
252 if item.type == "tree":
253 tree = item
254 else:
255 # Safety assertion - blobs are at the end of the path.
256 if i != len(tokens) - 1:
257 raise KeyError(msg % file)
258 return item
259 # END handle item type
260 # END for each token of split path
261 if item == self:
262 raise KeyError(msg % file)
263 return item
264 else:
265 for info in self._cache:
266 if info[2] == file: # [2] == name
267 return self._map_id_to_type[info[1] >> 12](
268 self.repo, info[0], info[1], join_path(self.path, info[2])
269 )
270 # END for each obj
271 raise KeyError(msg % file)
272 # END handle long paths
274 def __truediv__(self, file: PathLike) -> IndexObjUnion:
275 """The ``/`` operator is another syntax for joining.
277 See :meth:`join` for details.
278 """
279 return self.join(file)
281 @property
282 def trees(self) -> List["Tree"]:
283 """:return: list(Tree, ...) List of trees directly below this tree"""
284 return [i for i in self if i.type == "tree"]
286 @property
287 def blobs(self) -> List[Blob]:
288 """:return: list(Blob, ...) List of blobs directly below this tree"""
289 return [i for i in self if i.type == "blob"]
291 @property
292 def cache(self) -> TreeModifier:
293 """
294 :return:
295 An object allowing modification of the internal cache. This can be used to
296 change the tree's contents. When done, make sure you call
297 :meth:`~TreeModifier.set_done` on the tree modifier, or serialization
298 behaviour will be incorrect.
300 :note:
301 See :class:`TreeModifier` for more information on how to alter the cache.
302 """
303 return TreeModifier(self._cache)
305 def traverse(
306 self,
307 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
308 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
309 depth: int = -1,
310 branch_first: bool = True,
311 visit_once: bool = False,
312 ignore_self: int = 1,
313 as_edge: bool = False,
314 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
315 """For documentation, see
316 `Traversable._traverse() <git.objects.util.Traversable._traverse>`.
318 Trees are set to ``visit_once = False`` to gain more performance in the
319 traversal.
320 """
322 # # To typecheck instead of using cast.
323 # import itertools
324 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
325 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
327 # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
328 # ret_tup = itertools.tee(ret, 2)
329 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
330 # return ret_tup[0]
332 return cast(
333 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
334 super()._traverse(
335 predicate, # type: ignore[arg-type]
336 prune, # type: ignore[arg-type]
337 depth,
338 branch_first,
339 visit_once,
340 ignore_self,
341 ),
342 )
344 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
345 """
346 :return:
347 :class:`~git.util.IterableList` with the results of the traversal as
348 produced by :meth:`traverse`
350 Tree -> IterableList[Union[Submodule, Tree, Blob]]
351 """
352 return super()._list_traverse(*args, **kwargs)
354 # List protocol
356 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
357 return list(self._iter_convert_to_object(self._cache[i:j]))
359 def __iter__(self) -> Iterator[IndexObjUnion]:
360 return self._iter_convert_to_object(self._cache)
362 def __len__(self) -> int:
363 return len(self._cache)
365 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
366 if isinstance(item, int):
367 info = self._cache[item]
368 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
370 if isinstance(item, str):
371 # compatibility
372 return self.join(item)
373 # END index is basestring
375 raise TypeError("Invalid index type: %r" % item)
377 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
378 if isinstance(item, IndexObject):
379 for info in self._cache:
380 if item.binsha == info[0]:
381 return True
382 # END compare sha
383 # END for each entry
384 # END handle item is index object
385 # compatibility
387 # Treat item as repo-relative path.
388 else:
389 path = self.path
390 for info in self._cache:
391 if item == join_path(path, info[2]):
392 return True
393 # END for each item
394 return False
396 def __reversed__(self) -> Iterator[IndexObjUnion]:
397 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload]
399 def _serialize(self, stream: "BytesIO") -> "Tree":
400 """Serialize this tree into the stream. Assumes sorted tree data.
402 :note:
403 We will assume our tree data to be in a sorted state. If this is not the
404 case, serialization will not generate a correct tree representation as these
405 are assumed to be sorted by algorithms.
406 """
407 tree_to_stream(self._cache, stream.write)
408 return self
410 def _deserialize(self, stream: "BytesIO") -> "Tree":
411 self._cache = tree_entries_from_data(stream.read())
412 return self
415# END tree
417# Finalize map definition.
418Tree._map_id_to_type[Tree.tree_id] = Tree