Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/objects/tree.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

154 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6__all__ = ["TreeModifier", "Tree"] 

7 

8import os 

9import sys 

10 

11import git.diff as git_diff 

12from git.util import IterableList, join_path, to_bin_sha 

13 

14from . import util 

15from .base import IndexObjUnion, IndexObject 

16from .blob import Blob 

17from .fun import tree_entries_from_data, tree_to_stream 

18from .submodule.base import Submodule 

19 

20# typing ------------------------------------------------- 

21 

22from typing import ( 

23 Any, 

24 Callable, 

25 Dict, 

26 Iterable, 

27 Iterator, 

28 List, 

29 Tuple, 

30 TYPE_CHECKING, 

31 Type, 

32 Union, 

33 cast, 

34) 

35 

36if sys.version_info >= (3, 8): 

37 from typing import Literal 

38else: 

39 from typing_extensions import Literal 

40 

41from git.types import PathLike 

42 

43if TYPE_CHECKING: 

44 from io import BytesIO 

45 

46 from git.repo import Repo 

47 

48TreeCacheTup = Tuple[bytes, int, str] 

49 

50TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]] 

51 

52# -------------------------------------------------------- 

53 

54 

55def cmp(a: str, b: str) -> int: 

56 return (a > b) - (a < b) 

57 

58 

59class TreeModifier: 

60 """A utility class providing methods to alter the underlying cache in a list-like 

61 fashion. 

62 

63 Once all adjustments are complete, the :attr:`_cache`, which really is a reference 

64 to the cache of a tree, will be sorted. This ensures it will be in a serializable 

65 state. 

66 """ 

67 

68 __slots__ = ("_cache",) 

69 

70 def __init__(self, cache: List[TreeCacheTup]) -> None: 

71 self._cache = cache 

72 

73 def _index_by_name(self, name: str) -> int: 

74 """:return: index of an item with name, or -1 if not found""" 

75 for i, t in enumerate(self._cache): 

76 if t[2] == name: 

77 return i 

78 # END found item 

79 # END for each item in cache 

80 return -1 

81 

82 # { Interface 

83 def set_done(self) -> "TreeModifier": 

84 """Call this method once you are done modifying the tree information. 

85 

86 This may be called several times, but be aware that each call will cause a sort 

87 operation. 

88 

89 :return: 

90 self 

91 """ 

92 self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2]) 

93 return self 

94 

95 # } END interface 

96 

97 # { Mutators 

98 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier": 

99 """Add the given item to the tree. 

100 

101 If an item with the given name already exists, nothing will be done, but a 

102 :exc:`ValueError` will be raised if the sha and mode of the existing item do not 

103 match the one you add, unless `force` is ``True``. 

104 

105 :param sha: 

106 The 20 or 40 byte sha of the item to add. 

107 

108 :param mode: 

109 :class:`int` representing the stat-compatible mode of the item. 

110 

111 :param force: 

112 If ``True``, an item with your name and information will overwrite any 

113 existing item with the same name, no matter which information it has. 

114 

115 :return: 

116 self 

117 """ 

118 if "/" in name: 

119 raise ValueError("Name must not contain '/' characters") 

120 if (mode >> 12) not in Tree._map_id_to_type: 

121 raise ValueError("Invalid object type according to mode %o" % mode) 

122 

123 sha = to_bin_sha(sha) 

124 index = self._index_by_name(name) 

125 

126 item = (sha, mode, name) 

127 

128 if index == -1: 

129 self._cache.append(item) 

130 else: 

131 if force: 

132 self._cache[index] = item 

133 else: 

134 ex_item = self._cache[index] 

135 if ex_item[0] != sha or ex_item[1] != mode: 

136 raise ValueError("Item %r existed with different properties" % name) 

137 # END handle mismatch 

138 # END handle force 

139 # END handle name exists 

140 return self 

141 

142 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: 

143 """Add the given item to the tree. Its correctness is assumed, so it is the 

144 caller's responsibility to ensure that the input is correct. 

145 

146 For more information on the parameters, see :meth:`add`. 

147 

148 :param binsha: 

149 20 byte binary sha. 

150 """ 

151 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) 

152 tree_cache = (binsha, mode, name) 

153 

154 self._cache.append(tree_cache) 

155 

156 def __delitem__(self, name: str) -> None: 

157 """Delete an item with the given name if it exists.""" 

158 index = self._index_by_name(name) 

159 if index > -1: 

160 del self._cache[index] 

161 

162 # } END mutators 

163 

164 

165class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): 

166 R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and 

167 other :class:`Tree`\s. 

168 

169 See :manpage:`gitglossary(7)` on "tree object": 

170 https://git-scm.com/docs/gitglossary#def_tree_object 

171 

172 Subscripting is supported, as with a list or dict: 

173 

174 * Access a specific blob using the ``tree["filename"]`` notation. 

175 * You may likewise access by index, like ``blob = tree[0]``. 

176 """ 

177 

178 type: Literal["tree"] = "tree" 

179 

180 __slots__ = ("_cache",) 

181 

182 # Actual integer IDs for comparison. 

183 commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link. 

184 blob_id = 0o10 

185 symlink_id = 0o12 

186 tree_id = 0o04 

187 

188 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = { 

189 commit_id: Submodule, 

190 blob_id: Blob, 

191 symlink_id: Blob, 

192 # Tree ID added once Tree is defined. 

193 } 

194 

195 def __init__( 

196 self, 

197 repo: "Repo", 

198 binsha: bytes, 

199 mode: int = tree_id << 12, 

200 path: Union[PathLike, None] = None, 

201 ): 

202 super().__init__(repo, binsha, mode, path) 

203 

204 @classmethod 

205 def _get_intermediate_items( 

206 cls, 

207 index_object: IndexObjUnion, 

208 ) -> Union[Tuple["Tree", ...], Tuple[()]]: 

209 if index_object.type == "tree": 

210 return tuple(index_object._iter_convert_to_object(index_object._cache)) 

211 return () 

212 

213 def _set_cache_(self, attr: str) -> None: 

214 if attr == "_cache": 

215 # Set the data when we need it. 

216 ostream = self.repo.odb.stream(self.binsha) 

217 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read()) 

218 else: 

219 super()._set_cache_(attr) 

220 # END handle attribute 

221 

222 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]: 

223 """Iterable yields tuples of (binsha, mode, name), which will be converted to 

224 the respective object representation. 

225 """ 

226 for binsha, mode, name in iterable: 

227 path = join_path(self.path, name) 

228 try: 

229 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path) 

230 except KeyError as e: 

231 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e 

232 # END for each item 

233 

234 def join(self, file: PathLike) -> IndexObjUnion: 

235 """Find the named object in this tree's contents. 

236 

237 :return: 

238 :class:`~git.objects.blob.Blob`, :class:`Tree`, or 

239 :class:`~git.objects.submodule.base.Submodule` 

240 

241 :raise KeyError: 

242 If the given file or tree does not exist in this tree. 

243 """ 

244 msg = "Blob or Tree named %r not found" 

245 file = os.fspath(file) 

246 if "/" in file: 

247 tree = self 

248 item = self 

249 tokens = file.split("/") 

250 for i, token in enumerate(tokens): 

251 item = tree[token] 

252 if item.type == "tree": 

253 tree = item 

254 else: 

255 # Safety assertion - blobs are at the end of the path. 

256 if i != len(tokens) - 1: 

257 raise KeyError(msg % file) 

258 return item 

259 # END handle item type 

260 # END for each token of split path 

261 if item == self: 

262 raise KeyError(msg % file) 

263 return item 

264 else: 

265 for info in self._cache: 

266 if info[2] == file: # [2] == name 

267 return self._map_id_to_type[info[1] >> 12]( 

268 self.repo, info[0], info[1], join_path(self.path, info[2]) 

269 ) 

270 # END for each obj 

271 raise KeyError(msg % file) 

272 # END handle long paths 

273 

274 def __truediv__(self, file: PathLike) -> IndexObjUnion: 

275 """The ``/`` operator is another syntax for joining. 

276 

277 See :meth:`join` for details. 

278 """ 

279 return self.join(file) 

280 

281 @property 

282 def trees(self) -> List["Tree"]: 

283 """:return: list(Tree, ...) List of trees directly below this tree""" 

284 return [i for i in self if i.type == "tree"] 

285 

286 @property 

287 def blobs(self) -> List[Blob]: 

288 """:return: list(Blob, ...) List of blobs directly below this tree""" 

289 return [i for i in self if i.type == "blob"] 

290 

291 @property 

292 def cache(self) -> TreeModifier: 

293 """ 

294 :return: 

295 An object allowing modification of the internal cache. This can be used to 

296 change the tree's contents. When done, make sure you call 

297 :meth:`~TreeModifier.set_done` on the tree modifier, or serialization 

298 behaviour will be incorrect. 

299 

300 :note: 

301 See :class:`TreeModifier` for more information on how to alter the cache. 

302 """ 

303 return TreeModifier(self._cache) 

304 

305 def traverse( 

306 self, 

307 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True, 

308 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False, 

309 depth: int = -1, 

310 branch_first: bool = True, 

311 visit_once: bool = False, 

312 ignore_self: int = 1, 

313 as_edge: bool = False, 

314 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]: 

315 """For documentation, see 

316 `Traversable._traverse() <git.objects.util.Traversable._traverse>`. 

317 

318 Trees are set to ``visit_once = False`` to gain more performance in the 

319 traversal. 

320 """ 

321 

322 # # To typecheck instead of using cast. 

323 # import itertools 

324 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]: 

325 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1]) 

326 

327 # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self) 

328 # ret_tup = itertools.tee(ret, 2) 

329 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}" 

330 # return ret_tup[0] 

331 

332 return cast( 

333 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], 

334 super()._traverse( 

335 predicate, # type: ignore[arg-type] 

336 prune, # type: ignore[arg-type] 

337 depth, 

338 branch_first, 

339 visit_once, 

340 ignore_self, 

341 ), 

342 ) 

343 

344 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]: 

345 """ 

346 :return: 

347 :class:`~git.util.IterableList` with the results of the traversal as 

348 produced by :meth:`traverse` 

349 

350 Tree -> IterableList[Union[Submodule, Tree, Blob]] 

351 """ 

352 return super()._list_traverse(*args, **kwargs) 

353 

354 # List protocol 

355 

356 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]: 

357 return list(self._iter_convert_to_object(self._cache[i:j])) 

358 

359 def __iter__(self) -> Iterator[IndexObjUnion]: 

360 return self._iter_convert_to_object(self._cache) 

361 

362 def __len__(self) -> int: 

363 return len(self._cache) 

364 

365 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion: 

366 if isinstance(item, int): 

367 info = self._cache[item] 

368 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2])) 

369 

370 if isinstance(item, str): 

371 # compatibility 

372 return self.join(item) 

373 # END index is basestring 

374 

375 raise TypeError("Invalid index type: %r" % item) 

376 

377 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool: 

378 if isinstance(item, IndexObject): 

379 for info in self._cache: 

380 if item.binsha == info[0]: 

381 return True 

382 # END compare sha 

383 # END for each entry 

384 # END handle item is index object 

385 # compatibility 

386 

387 # Treat item as repo-relative path. 

388 else: 

389 path = self.path 

390 for info in self._cache: 

391 if item == join_path(path, info[2]): 

392 return True 

393 # END for each item 

394 return False 

395 

396 def __reversed__(self) -> Iterator[IndexObjUnion]: 

397 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload] 

398 

399 def _serialize(self, stream: "BytesIO") -> "Tree": 

400 """Serialize this tree into the stream. Assumes sorted tree data. 

401 

402 :note: 

403 We will assume our tree data to be in a sorted state. If this is not the 

404 case, serialization will not generate a correct tree representation as these 

405 are assumed to be sorted by algorithms. 

406 """ 

407 tree_to_stream(self._cache, stream.write) 

408 return self 

409 

410 def _deserialize(self, stream: "BytesIO") -> "Tree": 

411 self._cache = tree_entries_from_data(stream.read()) 

412 return self 

413 

414 

415# END tree 

416 

417# Finalize map definition. 

418Tree._map_id_to_type[Tree.tree_id] = Tree