1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
2#
3# This module is part of GitPython and is released under the
4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
5
6__all__ = ["TreeModifier", "Tree"]
7
8import sys
9
10import git.diff as git_diff
11from git.util import IterableList, join_path, to_bin_sha
12
13from . import util
14from .base import IndexObjUnion, IndexObject
15from .blob import Blob
16from .fun import tree_entries_from_data, tree_to_stream
17from .submodule.base import Submodule
18
19# typing -------------------------------------------------
20
21from typing import (
22 Any,
23 Callable,
24 Dict,
25 Iterable,
26 Iterator,
27 List,
28 Tuple,
29 TYPE_CHECKING,
30 Type,
31 Union,
32 cast,
33)
34
35if sys.version_info >= (3, 8):
36 from typing import Literal
37else:
38 from typing_extensions import Literal
39
40from git.types import PathLike
41
42if TYPE_CHECKING:
43 from io import BytesIO
44
45 from git.repo import Repo
46
47TreeCacheTup = Tuple[bytes, int, str]
48
49TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]]
50
51# --------------------------------------------------------
52
53
54def cmp(a: str, b: str) -> int:
55 return (a > b) - (a < b)
56
57
58class TreeModifier:
59 """A utility class providing methods to alter the underlying cache in a list-like
60 fashion.
61
62 Once all adjustments are complete, the :attr:`_cache`, which really is a reference
63 to the cache of a tree, will be sorted. This ensures it will be in a serializable
64 state.
65 """
66
67 __slots__ = ("_cache",)
68
69 def __init__(self, cache: List[TreeCacheTup]) -> None:
70 self._cache = cache
71
72 def _index_by_name(self, name: str) -> int:
73 """:return: index of an item with name, or -1 if not found"""
74 for i, t in enumerate(self._cache):
75 if t[2] == name:
76 return i
77 # END found item
78 # END for each item in cache
79 return -1
80
81 # { Interface
82 def set_done(self) -> "TreeModifier":
83 """Call this method once you are done modifying the tree information.
84
85 This may be called several times, but be aware that each call will cause a sort
86 operation.
87
88 :return:
89 self
90 """
91 self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2])
92 return self
93
94 # } END interface
95
96 # { Mutators
97 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier":
98 """Add the given item to the tree.
99
100 If an item with the given name already exists, nothing will be done, but a
101 :exc:`ValueError` will be raised if the sha and mode of the existing item do not
102 match the one you add, unless `force` is ``True``.
103
104 :param sha:
105 The 20 or 40 byte sha of the item to add.
106
107 :param mode:
108 :class:`int` representing the stat-compatible mode of the item.
109
110 :param force:
111 If ``True``, an item with your name and information will overwrite any
112 existing item with the same name, no matter which information it has.
113
114 :return:
115 self
116 """
117 if "/" in name:
118 raise ValueError("Name must not contain '/' characters")
119 if (mode >> 12) not in Tree._map_id_to_type:
120 raise ValueError("Invalid object type according to mode %o" % mode)
121
122 sha = to_bin_sha(sha)
123 index = self._index_by_name(name)
124
125 item = (sha, mode, name)
126
127 if index == -1:
128 self._cache.append(item)
129 else:
130 if force:
131 self._cache[index] = item
132 else:
133 ex_item = self._cache[index]
134 if ex_item[0] != sha or ex_item[1] != mode:
135 raise ValueError("Item %r existed with different properties" % name)
136 # END handle mismatch
137 # END handle force
138 # END handle name exists
139 return self
140
141 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None:
142 """Add the given item to the tree. Its correctness is assumed, so it is the
143 caller's responsibility to ensure that the input is correct.
144
145 For more information on the parameters, see :meth:`add`.
146
147 :param binsha:
148 20 byte binary sha.
149 """
150 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str)
151 tree_cache = (binsha, mode, name)
152
153 self._cache.append(tree_cache)
154
155 def __delitem__(self, name: str) -> None:
156 """Delete an item with the given name if it exists."""
157 index = self._index_by_name(name)
158 if index > -1:
159 del self._cache[index]
160
161 # } END mutators
162
163
164class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable):
165 R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and
166 other :class:`Tree`\s.
167
168 See :manpage:`gitglossary(7)` on "tree object":
169 https://git-scm.com/docs/gitglossary#def_tree_object
170
171 Subscripting is supported, as with a list or dict:
172
173 * Access a specific blob using the ``tree["filename"]`` notation.
174 * You may likewise access by index, like ``blob = tree[0]``.
175 """
176
177 type: Literal["tree"] = "tree"
178
179 __slots__ = ("_cache",)
180
181 # Actual integer IDs for comparison.
182 commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link.
183 blob_id = 0o10
184 symlink_id = 0o12
185 tree_id = 0o04
186
187 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = {
188 commit_id: Submodule,
189 blob_id: Blob,
190 symlink_id: Blob,
191 # Tree ID added once Tree is defined.
192 }
193
194 def __init__(
195 self,
196 repo: "Repo",
197 binsha: bytes,
198 mode: int = tree_id << 12,
199 path: Union[PathLike, None] = None,
200 ):
201 super().__init__(repo, binsha, mode, path)
202
203 @classmethod
204 def _get_intermediate_items(
205 cls,
206 index_object: IndexObjUnion,
207 ) -> Union[Tuple["Tree", ...], Tuple[()]]:
208 if index_object.type == "tree":
209 return tuple(index_object._iter_convert_to_object(index_object._cache))
210 return ()
211
212 def _set_cache_(self, attr: str) -> None:
213 if attr == "_cache":
214 # Set the data when we need it.
215 ostream = self.repo.odb.stream(self.binsha)
216 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read())
217 else:
218 super()._set_cache_(attr)
219 # END handle attribute
220
221 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]:
222 """Iterable yields tuples of (binsha, mode, name), which will be converted to
223 the respective object representation.
224 """
225 for binsha, mode, name in iterable:
226 path = join_path(self.path, name)
227 try:
228 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path)
229 except KeyError as e:
230 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e
231 # END for each item
232
233 def join(self, file: str) -> IndexObjUnion:
234 """Find the named object in this tree's contents.
235
236 :return:
237 :class:`~git.objects.blob.Blob`, :class:`Tree`, or
238 :class:`~git.objects.submodule.base.Submodule`
239
240 :raise KeyError:
241 If the given file or tree does not exist in this tree.
242 """
243 msg = "Blob or Tree named %r not found"
244 if "/" in file:
245 tree = self
246 item = self
247 tokens = file.split("/")
248 for i, token in enumerate(tokens):
249 item = tree[token]
250 if item.type == "tree":
251 tree = item
252 else:
253 # Safety assertion - blobs are at the end of the path.
254 if i != len(tokens) - 1:
255 raise KeyError(msg % file)
256 return item
257 # END handle item type
258 # END for each token of split path
259 if item == self:
260 raise KeyError(msg % file)
261 return item
262 else:
263 for info in self._cache:
264 if info[2] == file: # [2] == name
265 return self._map_id_to_type[info[1] >> 12](
266 self.repo, info[0], info[1], join_path(self.path, info[2])
267 )
268 # END for each obj
269 raise KeyError(msg % file)
270 # END handle long paths
271
272 def __truediv__(self, file: str) -> IndexObjUnion:
273 """The ``/`` operator is another syntax for joining.
274
275 See :meth:`join` for details.
276 """
277 return self.join(file)
278
279 @property
280 def trees(self) -> List["Tree"]:
281 """:return: list(Tree, ...) List of trees directly below this tree"""
282 return [i for i in self if i.type == "tree"]
283
284 @property
285 def blobs(self) -> List[Blob]:
286 """:return: list(Blob, ...) List of blobs directly below this tree"""
287 return [i for i in self if i.type == "blob"]
288
289 @property
290 def cache(self) -> TreeModifier:
291 """
292 :return:
293 An object allowing modification of the internal cache. This can be used to
294 change the tree's contents. When done, make sure you call
295 :meth:`~TreeModifier.set_done` on the tree modifier, or serialization
296 behaviour will be incorrect.
297
298 :note:
299 See :class:`TreeModifier` for more information on how to alter the cache.
300 """
301 return TreeModifier(self._cache)
302
303 def traverse(
304 self,
305 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True,
306 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False,
307 depth: int = -1,
308 branch_first: bool = True,
309 visit_once: bool = False,
310 ignore_self: int = 1,
311 as_edge: bool = False,
312 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]:
313 """For documentation, see
314 `Traversable._traverse() <git.objects.util.Traversable._traverse>`.
315
316 Trees are set to ``visit_once = False`` to gain more performance in the
317 traversal.
318 """
319
320 # # To typecheck instead of using cast.
321 # import itertools
322 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]:
323 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1])
324
325 # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self)
326 # ret_tup = itertools.tee(ret, 2)
327 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}"
328 # return ret_tup[0]
329
330 return cast(
331 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]],
332 super()._traverse(
333 predicate, # type: ignore[arg-type]
334 prune, # type: ignore[arg-type]
335 depth,
336 branch_first,
337 visit_once,
338 ignore_self,
339 ),
340 )
341
342 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]:
343 """
344 :return:
345 :class:`~git.util.IterableList` with the results of the traversal as
346 produced by :meth:`traverse`
347
348 Tree -> IterableList[Union[Submodule, Tree, Blob]]
349 """
350 return super()._list_traverse(*args, **kwargs)
351
352 # List protocol
353
354 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]:
355 return list(self._iter_convert_to_object(self._cache[i:j]))
356
357 def __iter__(self) -> Iterator[IndexObjUnion]:
358 return self._iter_convert_to_object(self._cache)
359
360 def __len__(self) -> int:
361 return len(self._cache)
362
363 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion:
364 if isinstance(item, int):
365 info = self._cache[item]
366 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2]))
367
368 if isinstance(item, str):
369 # compatibility
370 return self.join(item)
371 # END index is basestring
372
373 raise TypeError("Invalid index type: %r" % item)
374
375 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool:
376 if isinstance(item, IndexObject):
377 for info in self._cache:
378 if item.binsha == info[0]:
379 return True
380 # END compare sha
381 # END for each entry
382 # END handle item is index object
383 # compatibility
384
385 # Treat item as repo-relative path.
386 else:
387 path = self.path
388 for info in self._cache:
389 if item == join_path(path, info[2]):
390 return True
391 # END for each item
392 return False
393
394 def __reversed__(self) -> Iterator[IndexObjUnion]:
395 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload]
396
397 def _serialize(self, stream: "BytesIO") -> "Tree":
398 """Serialize this tree into the stream. Assumes sorted tree data.
399
400 :note:
401 We will assume our tree data to be in a sorted state. If this is not the
402 case, serialization will not generate a correct tree representation as these
403 are assumed to be sorted by algorithms.
404 """
405 tree_to_stream(self._cache, stream.write)
406 return self
407
408 def _deserialize(self, stream: "BytesIO") -> "Tree":
409 self._cache = tree_entries_from_data(stream.read())
410 return self
411
412
413# END tree
414
415# Finalize map definition.
416Tree._map_id_to_type[Tree.tree_id] = Tree