Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/objects/tree.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

150 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6__all__ = ["TreeModifier", "Tree"] 

7 

8import sys 

9 

10import git.diff as git_diff 

11from git.util import IterableList, join_path, to_bin_sha 

12 

13from . import util 

14from .base import IndexObjUnion, IndexObject 

15from .blob import Blob 

16from .fun import tree_entries_from_data, tree_to_stream 

17from .submodule.base import Submodule 

18 

19# typing ------------------------------------------------- 

20 

21from typing import ( 

22 Any, 

23 Callable, 

24 Dict, 

25 Iterable, 

26 Iterator, 

27 List, 

28 Tuple, 

29 TYPE_CHECKING, 

30 Type, 

31 Union, 

32 cast, 

33) 

34 

35if sys.version_info >= (3, 8): 

36 from typing import Literal 

37else: 

38 from typing_extensions import Literal 

39 

40from git.types import PathLike 

41 

42if TYPE_CHECKING: 

43 from io import BytesIO 

44 

45 from git.repo import Repo 

46 

47TreeCacheTup = Tuple[bytes, int, str] 

48 

49TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]] 

50 

51# -------------------------------------------------------- 

52 

53cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b) 

54 

55 

56class TreeModifier: 

57 """A utility class providing methods to alter the underlying cache in a list-like 

58 fashion. 

59 

60 Once all adjustments are complete, the :attr:`_cache`, which really is a reference 

61 to the cache of a tree, will be sorted. This ensures it will be in a serializable 

62 state. 

63 """ 

64 

65 __slots__ = ("_cache",) 

66 

67 def __init__(self, cache: List[TreeCacheTup]) -> None: 

68 self._cache = cache 

69 

70 def _index_by_name(self, name: str) -> int: 

71 """:return: index of an item with name, or -1 if not found""" 

72 for i, t in enumerate(self._cache): 

73 if t[2] == name: 

74 return i 

75 # END found item 

76 # END for each item in cache 

77 return -1 

78 

79 # { Interface 

80 def set_done(self) -> "TreeModifier": 

81 """Call this method once you are done modifying the tree information. 

82 

83 This may be called several times, but be aware that each call will cause a sort 

84 operation. 

85 

86 :return: 

87 self 

88 """ 

89 self._cache.sort(key=lambda x: (x[2] + "/") if x[1] == Tree.tree_id << 12 else x[2]) 

90 return self 

91 

92 # } END interface 

93 

94 # { Mutators 

95 def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier": 

96 """Add the given item to the tree. 

97 

98 If an item with the given name already exists, nothing will be done, but a 

99 :exc:`ValueError` will be raised if the sha and mode of the existing item do not 

100 match the one you add, unless `force` is ``True``. 

101 

102 :param sha: 

103 The 20 or 40 byte sha of the item to add. 

104 

105 :param mode: 

106 :class:`int` representing the stat-compatible mode of the item. 

107 

108 :param force: 

109 If ``True``, an item with your name and information will overwrite any 

110 existing item with the same name, no matter which information it has. 

111 

112 :return: 

113 self 

114 """ 

115 if "/" in name: 

116 raise ValueError("Name must not contain '/' characters") 

117 if (mode >> 12) not in Tree._map_id_to_type: 

118 raise ValueError("Invalid object type according to mode %o" % mode) 

119 

120 sha = to_bin_sha(sha) 

121 index = self._index_by_name(name) 

122 

123 item = (sha, mode, name) 

124 

125 if index == -1: 

126 self._cache.append(item) 

127 else: 

128 if force: 

129 self._cache[index] = item 

130 else: 

131 ex_item = self._cache[index] 

132 if ex_item[0] != sha or ex_item[1] != mode: 

133 raise ValueError("Item %r existed with different properties" % name) 

134 # END handle mismatch 

135 # END handle force 

136 # END handle name exists 

137 return self 

138 

139 def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: 

140 """Add the given item to the tree. Its correctness is assumed, so it is the 

141 caller's responsibility to ensure that the input is correct. 

142 

143 For more information on the parameters, see :meth:`add`. 

144 

145 :param binsha: 

146 20 byte binary sha. 

147 """ 

148 assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) 

149 tree_cache = (binsha, mode, name) 

150 

151 self._cache.append(tree_cache) 

152 

153 def __delitem__(self, name: str) -> None: 

154 """Delete an item with the given name if it exists.""" 

155 index = self._index_by_name(name) 

156 if index > -1: 

157 del self._cache[index] 

158 

159 # } END mutators 

160 

161 

162class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): 

163 R"""Tree objects represent an ordered list of :class:`~git.objects.blob.Blob`\s and 

164 other :class:`Tree`\s. 

165 

166 See :manpage:`gitglossary(7)` on "tree object": 

167 https://git-scm.com/docs/gitglossary#def_tree_object 

168 

169 Subscripting is supported, as with a list or dict: 

170 

171 * Access a specific blob using the ``tree["filename"]`` notation. 

172 * You may likewise access by index, like ``blob = tree[0]``. 

173 """ 

174 

175 type: Literal["tree"] = "tree" 

176 

177 __slots__ = ("_cache",) 

178 

179 # Actual integer IDs for comparison. 

180 commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link. 

181 blob_id = 0o10 

182 symlink_id = 0o12 

183 tree_id = 0o04 

184 

185 _map_id_to_type: Dict[int, Type[IndexObjUnion]] = { 

186 commit_id: Submodule, 

187 blob_id: Blob, 

188 symlink_id: Blob, 

189 # Tree ID added once Tree is defined. 

190 } 

191 

192 def __init__( 

193 self, 

194 repo: "Repo", 

195 binsha: bytes, 

196 mode: int = tree_id << 12, 

197 path: Union[PathLike, None] = None, 

198 ): 

199 super().__init__(repo, binsha, mode, path) 

200 

201 @classmethod 

202 def _get_intermediate_items( 

203 cls, 

204 index_object: IndexObjUnion, 

205 ) -> Union[Tuple["Tree", ...], Tuple[()]]: 

206 if index_object.type == "tree": 

207 return tuple(index_object._iter_convert_to_object(index_object._cache)) 

208 return () 

209 

210 def _set_cache_(self, attr: str) -> None: 

211 if attr == "_cache": 

212 # Set the data when we need it. 

213 ostream = self.repo.odb.stream(self.binsha) 

214 self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read()) 

215 else: 

216 super()._set_cache_(attr) 

217 # END handle attribute 

218 

219 def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]: 

220 """Iterable yields tuples of (binsha, mode, name), which will be converted to 

221 the respective object representation. 

222 """ 

223 for binsha, mode, name in iterable: 

224 path = join_path(self.path, name) 

225 try: 

226 yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path) 

227 except KeyError as e: 

228 raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e 

229 # END for each item 

230 

231 def join(self, file: str) -> IndexObjUnion: 

232 """Find the named object in this tree's contents. 

233 

234 :return: 

235 :class:`~git.objects.blob.Blob`, :class:`Tree`, or 

236 :class:`~git.objects.submodule.base.Submodule` 

237 

238 :raise KeyError: 

239 If the given file or tree does not exist in this tree. 

240 """ 

241 msg = "Blob or Tree named %r not found" 

242 if "/" in file: 

243 tree = self 

244 item = self 

245 tokens = file.split("/") 

246 for i, token in enumerate(tokens): 

247 item = tree[token] 

248 if item.type == "tree": 

249 tree = item 

250 else: 

251 # Safety assertion - blobs are at the end of the path. 

252 if i != len(tokens) - 1: 

253 raise KeyError(msg % file) 

254 return item 

255 # END handle item type 

256 # END for each token of split path 

257 if item == self: 

258 raise KeyError(msg % file) 

259 return item 

260 else: 

261 for info in self._cache: 

262 if info[2] == file: # [2] == name 

263 return self._map_id_to_type[info[1] >> 12]( 

264 self.repo, info[0], info[1], join_path(self.path, info[2]) 

265 ) 

266 # END for each obj 

267 raise KeyError(msg % file) 

268 # END handle long paths 

269 

270 def __truediv__(self, file: str) -> IndexObjUnion: 

271 """The ``/`` operator is another syntax for joining. 

272 

273 See :meth:`join` for details. 

274 """ 

275 return self.join(file) 

276 

277 @property 

278 def trees(self) -> List["Tree"]: 

279 """:return: list(Tree, ...) List of trees directly below this tree""" 

280 return [i for i in self if i.type == "tree"] 

281 

282 @property 

283 def blobs(self) -> List[Blob]: 

284 """:return: list(Blob, ...) List of blobs directly below this tree""" 

285 return [i for i in self if i.type == "blob"] 

286 

287 @property 

288 def cache(self) -> TreeModifier: 

289 """ 

290 :return: 

291 An object allowing modification of the internal cache. This can be used to 

292 change the tree's contents. When done, make sure you call 

293 :meth:`~TreeModifier.set_done` on the tree modifier, or serialization 

294 behaviour will be incorrect. 

295 

296 :note: 

297 See :class:`TreeModifier` for more information on how to alter the cache. 

298 """ 

299 return TreeModifier(self._cache) 

300 

301 def traverse( 

302 self, 

303 predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True, 

304 prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False, 

305 depth: int = -1, 

306 branch_first: bool = True, 

307 visit_once: bool = False, 

308 ignore_self: int = 1, 

309 as_edge: bool = False, 

310 ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]: 

311 """For documentation, see 

312 `Traversable._traverse() <git.objects.util.Traversable._traverse>`. 

313 

314 Trees are set to ``visit_once = False`` to gain more performance in the 

315 traversal. 

316 """ 

317 

318 # # To typecheck instead of using cast. 

319 # import itertools 

320 # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]: 

321 # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1]) 

322 

323 # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self) 

324 # ret_tup = itertools.tee(ret, 2) 

325 # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}" 

326 # return ret_tup[0] 

327 

328 return cast( 

329 Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], 

330 super()._traverse( 

331 predicate, # type: ignore[arg-type] 

332 prune, # type: ignore[arg-type] 

333 depth, 

334 branch_first, 

335 visit_once, 

336 ignore_self, 

337 ), 

338 ) 

339 

340 def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]: 

341 """ 

342 :return: 

343 :class:`~git.util.IterableList` with the results of the traversal as 

344 produced by :meth:`traverse` 

345 

346 Tree -> IterableList[Union[Submodule, Tree, Blob]] 

347 """ 

348 return super()._list_traverse(*args, **kwargs) 

349 

350 # List protocol 

351 

352 def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]: 

353 return list(self._iter_convert_to_object(self._cache[i:j])) 

354 

355 def __iter__(self) -> Iterator[IndexObjUnion]: 

356 return self._iter_convert_to_object(self._cache) 

357 

358 def __len__(self) -> int: 

359 return len(self._cache) 

360 

361 def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion: 

362 if isinstance(item, int): 

363 info = self._cache[item] 

364 return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2])) 

365 

366 if isinstance(item, str): 

367 # compatibility 

368 return self.join(item) 

369 # END index is basestring 

370 

371 raise TypeError("Invalid index type: %r" % item) 

372 

373 def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool: 

374 if isinstance(item, IndexObject): 

375 for info in self._cache: 

376 if item.binsha == info[0]: 

377 return True 

378 # END compare sha 

379 # END for each entry 

380 # END handle item is index object 

381 # compatibility 

382 

383 # Treat item as repo-relative path. 

384 else: 

385 path = self.path 

386 for info in self._cache: 

387 if item == join_path(path, info[2]): 

388 return True 

389 # END for each item 

390 return False 

391 

392 def __reversed__(self) -> Iterator[IndexObjUnion]: 

393 return reversed(self._iter_convert_to_object(self._cache)) # type: ignore[call-overload] 

394 

395 def _serialize(self, stream: "BytesIO") -> "Tree": 

396 """Serialize this tree into the stream. Assumes sorted tree data. 

397 

398 :note: 

399 We will assume our tree data to be in a sorted state. If this is not the 

400 case, serialization will not generate a correct tree representation as these 

401 are assumed to be sorted by algorithms. 

402 """ 

403 tree_to_stream(self._cache, stream.write) 

404 return self 

405 

406 def _deserialize(self, stream: "BytesIO") -> "Tree": 

407 self._cache = tree_entries_from_data(stream.read()) 

408 return self 

409 

410 

411# END tree 

412 

413# Finalize map definition. 

414Tree._map_id_to_type[Tree.tree_id] = Tree