Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/objects/tree.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

152 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6__all__ = ["TreeModifier", "Tree"] 

7 

8import sys 

9 

10import git.diff as git_diff 

11from git.util import IterableList, join_path, to_bin_sha 

12 

13from . import util 

14from .base import IndexObjUnion, IndexObject 

15from .blob import Blob 

16from .fun import tree_entries_from_data, tree_to_stream 

17from .submodule.base import Submodule 

18 

19# typing ------------------------------------------------- 

20 

21from typing import ( 

22 Any, 

23 Callable, 

24 Dict, 

25 Iterable, 

26 Iterator, 

27 List, 

28 Tuple, 

29 TYPE_CHECKING, 

30 Type, 

31 Union, 

32 cast, 

33) 

34 

35if sys.version_info >= (3, 8): 

36 from typing import Literal 

37else: 

38 from typing_extensions import Literal 

39 

40from git.types import PathLike 

41 

42if TYPE_CHECKING: 

43 from io import BytesIO 

44 

45 from git.repo import Repo 

46 

47TreeCacheTup = Tuple[bytes, int, str] 

48 

49TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]] 

50 

51# -------------------------------------------------------- 

52 

53 

54def cmp(a: str, b: str) -> int: 

55 return (a > b) - (a < b) 

56 

57 

58class TreeModifier: 

59 """A utility class providing methods to alter the underlying cache in a list-like 

60 fashion. 

61 

62 Once all adjustments are complete, the :attr:`_cache`, which really is a reference 

63 to the cache of a tree, will be sorted. This ensures it will be in a serializable 

64 state. 

65 """ 

66 

67 __slots__ = ("_cache",) 

68 

69 def __init__(self, cache: List[TreeCacheTup]) -> None: 

70 self._cache = cache 

71 

72 def _index_by_name(self, name: str) -> int: 

73 """:return: index of an item with name, or -1 if not found""" 

74 for i, t in enumerate(self._cache): 

75 if t[2] == name: 

76 return i 

77 # END found item 

78 # END for each item in cache 

79 return -1 

80 

81 # { Interface 

82 def set_done(self) -> "TreeModifier": 

83 """Call this method once you are done modifying the tree information. 

84 

85 This may be called several times, but be aware that each call will cause a sort 

86 operation. 

87 

88 :return: 

89 self 

90 """ 

91 self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2]) 

92 return self 

93 

94 # } END interface 

95 

96 # { Mutators 

97 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier": 

98 """Add the given item to the tree. 

99 

100 If an item with the given name already exists, nothing will be done, but a 

101 :exc:`ValueError` will be raised if the sha and mode of the existing item do not 

102 match the one you add, unless `force` is ``True``. 

103 

104 :param sha: 

105 The 20 or 40 byte sha of the item to add. 

106 

107 :param mode: 

108 :class:`int` representing the stat-compatible mode of the item. 

109 

110 :param force: 

111 If ``True``, an item with your name and information will overwrite any 

112 existing item with the same name, no matter which information it has. 

113 

114 :return: 

115 self 

116 """ 

117 if "/" in name: 

118 raise ValueError("Name must not contain '/' characters") 

119 if (mode >> 12) not in Tree._map_id_to_type: 

120 raise ValueError("Invalid object type according to mode %o" % mode) 

121 

122 sha = to_bin_sha(sha) 

123 index = self._index_by_name(name) 

124 

125 item = (sha, mode, name) 

126 

127 if index == -1: 

128 self._cache.append(item) 

129 else: 

130 if force: 

131 self._cache[index] = item 

132 else: 

133 ex_item = self._cache[index] 

134 if ex_item[0] != sha or ex_item[1] != mode: 

135 raise ValueError("Item %r existed with different properties" % name) 

136 # END handle mismatch 

137 # END handle force 

138 # END handle name exists 

139 return self 

140 

141 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: 

142 """Add the given item to the tree. Its correctness is assumed, so it is the 

143 caller's responsibility to ensure that the input is correct. 

144 

145 For more information on the parameters, see :meth:`add`. 

146 

147 :param binsha: 

148 20 byte binary sha. 

149 """ 

150 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) 

151 tree_cache = (binsha, mode, name) 

152 

153 self._cache.append(tree_cache) 

154 

155 def __delitem__(self, name: str) -> None: 

156 """Delete an item with the given name if it exists.""" 

157 index = self._index_by_name(name) 

158 if index > -1: 

159 del self._cache[index] 

160 

161 # } END mutators 

162 

163 

164class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): 

165 R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and 

166 other :class:`Tree`\s. 

167 

168 See :manpage:`gitglossary(7)` on "tree object": 

169 https://git-scm.com/docs/gitglossary#def_tree_object 

170 

171 Subscripting is supported, as with a list or dict: 

172 

173 * Access a specific blob using the ``tree["filename"]`` notation. 

174 * You may likewise access by index, like ``blob = tree[0]``. 

175 """ 

176 

177 type: Literal["tree"] = "tree" 

178 

179 __slots__ = ("_cache",) 

180 

181 # Actual integer IDs for comparison. 

182 commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link. 

183 blob_id = 0o10 

184 symlink_id = 0o12 

185 tree_id = 0o04 

186 

187 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = { 

188 commit_id: Submodule, 

189 blob_id: Blob, 

190 symlink_id: Blob, 

191 # Tree ID added once Tree is defined. 

192 } 

193 

194 def __init__( 

195 self, 

196 repo: "Repo", 

197 binsha: bytes, 

198 mode: int = tree_id << 12, 

199 path: Union[PathLike, None] = None, 

200 ): 

201 super().__init__(repo, binsha, mode, path) 

202 

203 @classmethod 

204 def _get_intermediate_items( 

205 cls, 

206 index_object: IndexObjUnion, 

207 ) -> Union[Tuple["Tree", ...], Tuple[()]]: 

208 if index_object.type == "tree": 

209 return tuple(index_object._iter_convert_to_object(index_object._cache)) 

210 return () 

211 

212 def _set_cache_(self, attr: str) -> None: 

213 if attr == "_cache": 

214 # Set the data when we need it. 

215 ostream = self.repo.odb.stream(self.binsha) 

216 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read()) 

217 else: 

218 super()._set_cache_(attr) 

219 # END handle attribute 

220 

221 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]: 

222 """Iterable yields tuples of (binsha, mode, name), which will be converted to 

223 the respective object representation. 

224 """ 

225 for binsha, mode, name in iterable: 

226 path = join_path(self.path, name) 

227 try: 

228 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path) 

229 except KeyError as e: 

230 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e 

231 # END for each item 

232 

233 def join(self, file: str) -> IndexObjUnion: 

234 """Find the named object in this tree's contents. 

235 

236 :return: 

237 :class:`~git.objects.blob.Blob`, :class:`Tree`, or 

238 :class:`~git.objects.submodule.base.Submodule` 

239 

240 :raise KeyError: 

241 If the given file or tree does not exist in this tree. 

242 """ 

243 msg = "Blob or Tree named %r not found" 

244 if "/" in file: 

245 tree = self 

246 item = self 

247 tokens = file.split("/") 

248 for i, token in enumerate(tokens): 

249 item = tree[token] 

250 if item.type == "tree": 

251 tree = item 

252 else: 

253 # Safety assertion - blobs are at the end of the path. 

254 if i != len(tokens) - 1: 

255 raise KeyError(msg % file) 

256 return item 

257 # END handle item type 

258 # END for each token of split path 

259 if item == self: 

260 raise KeyError(msg % file) 

261 return item 

262 else: 

263 for info in self._cache: 

264 if info[2] == file: # [2] == name 

265 return self._map_id_to_type[info[1] >> 12]( 

266 self.repo, info[0], info[1], join_path(self.path, info[2]) 

267 ) 

268 # END for each obj 

269 raise KeyError(msg % file) 

270 # END handle long paths 

271 

272 def __truediv__(self, file: str) -> IndexObjUnion: 

273 """The ``/`` operator is another syntax for joining. 

274 

275 See :meth:`join` for details. 

276 """ 

277 return self.join(file) 

278 

279 @property 

280 def trees(self) -> List["Tree"]: 

281 """:return: list(Tree, ...) List of trees directly below this tree""" 

282 return [i for i in self if i.type == "tree"] 

283 

284 @property 

285 def blobs(self) -> List[Blob]: 

286 """:return: list(Blob, ...) List of blobs directly below this tree""" 

287 return [i for i in self if i.type == "blob"] 

288 

289 @property 

290 def cache(self) -> TreeModifier: 

291 """ 

292 :return: 

293 An object allowing modification of the internal cache. This can be used to 

294 change the tree's contents. When done, make sure you call 

295 :meth:`~TreeModifier.set_done` on the tree modifier, or serialization 

296 behaviour will be incorrect. 

297 

298 :note: 

299 See :class:`TreeModifier` for more information on how to alter the cache. 

300 """ 

301 return TreeModifier(self._cache) 

302 

303 def traverse( 

304 self, 

305 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True, 

306 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False, 

307 depth: int = -1, 

308 branch_first: bool = True, 

309 visit_once: bool = False, 

310 ignore_self: int = 1, 

311 as_edge: bool = False, 

312 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]: 

313 """For documentation, see 

314 `Traversable._traverse() <git.objects.util.Traversable._traverse>`. 

315 

316 Trees are set to ``visit_once = False`` to gain more performance in the 

317 traversal. 

318 """ 

319 

320 # # To typecheck instead of using cast. 

321 # import itertools 

322 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]: 

323 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1]) 

324 

325 # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self) 

326 # ret_tup = itertools.tee(ret, 2) 

327 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}" 

328 # return ret_tup[0] 

329 

330 return cast( 

331 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], 

332 super()._traverse( 

333 predicate, # type: ignore[arg-type] 

334 prune, # type: ignore[arg-type] 

335 depth, 

336 branch_first, 

337 visit_once, 

338 ignore_self, 

339 ), 

340 ) 

341 

342 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]: 

343 """ 

344 :return: 

345 :class:`~git.util.IterableList` with the results of the traversal as 

346 produced by :meth:`traverse` 

347 

348 Tree -> IterableList[Union[Submodule, Tree, Blob]] 

349 """ 

350 return super()._list_traverse(*args, **kwargs) 

351 

352 # List protocol 

353 

354 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]: 

355 return list(self._iter_convert_to_object(self._cache[i:j])) 

356 

357 def __iter__(self) -> Iterator[IndexObjUnion]: 

358 return self._iter_convert_to_object(self._cache) 

359 

360 def __len__(self) -> int: 

361 return len(self._cache) 

362 

363 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion: 

364 if isinstance(item, int): 

365 info = self._cache[item] 

366 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2])) 

367 

368 if isinstance(item, str): 

369 # compatibility 

370 return self.join(item) 

371 # END index is basestring 

372 

373 raise TypeError("Invalid index type: %r" % item) 

374 

375 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool: 

376 if isinstance(item, IndexObject): 

377 for info in self._cache: 

378 if item.binsha == info[0]: 

379 return True 

380 # END compare sha 

381 # END for each entry 

382 # END handle item is index object 

383 # compatibility 

384 

385 # Treat item as repo-relative path. 

386 else: 

387 path = self.path 

388 for info in self._cache: 

389 if item == join_path(path, info[2]): 

390 return True 

391 # END for each item 

392 return False 

393 

394 def __reversed__(self) -> Iterator[IndexObjUnion]: 

395 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload] 

396 

397 def _serialize(self, stream: "BytesIO") -> "Tree": 

398 """Serialize this tree into the stream. Assumes sorted tree data. 

399 

400 :note: 

401 We will assume our tree data to be in a sorted state. If this is not the 

402 case, serialization will not generate a correct tree representation as these 

403 are assumed to be sorted by algorithms. 

404 """ 

405 tree_to_stream(self._cache, stream.write) 

406 return self 

407 

408 def _deserialize(self, stream: "BytesIO") -> "Tree": 

409 self._cache = tree_entries_from_data(stream.read()) 

410 return self 

411 

412 

413# END tree 

414 

415# Finalize map definition. 

416Tree._map_id_to_type[Tree.tree_id] = Tree