Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/objects/fun.py: 49%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

100 statements  

1# This module is part of GitPython and is released under the 

2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

3 

4"""Functions that are supposed to be as fast as possible.""" 

5 

6__all__ = [ 

7 "tree_to_stream", 

8 "tree_entries_from_data", 

9 "traverse_trees_recursive", 

10 "traverse_tree_recursive", 

11] 

12 

13from stat import S_ISDIR 

14 

15from git.compat import safe_decode, defenc 

16 

17# typing ---------------------------------------------- 

18 

19from typing import ( 

20 Callable, 

21 List, 

22 MutableSequence, 

23 Sequence, 

24 Tuple, 

25 TYPE_CHECKING, 

26 Union, 

27 overload, 

28) 

29 

30if TYPE_CHECKING: 

31 from _typeshed import ReadableBuffer 

32 

33 from git import GitCmdObjectDB 

34 

35EntryTup = Tuple[bytes, int, str] # Same as TreeCacheTup in tree.py. 

36EntryTupOrNone = Union[EntryTup, None] 

37 

38# --------------------------------------------------- 

39 

40 

41def tree_to_stream(entries: Sequence[EntryTup], write: Callable[["ReadableBuffer"], Union[int, None]]) -> None: 

42 """Write the given list of entries into a stream using its ``write`` method. 

43 

44 :param entries: 

45 **Sorted** list of tuples with (binsha, mode, name). 

46 

47 :param write: 

48 A ``write`` method which takes a data string. 

49 """ 

50 ord_zero = ord("0") 

51 bit_mask = 7 # 3 bits set. 

52 

53 for binsha, mode, name in entries: 

54 mode_str = b"" 

55 for i in range(6): 

56 mode_str = bytes([((mode >> (i * 3)) & bit_mask) + ord_zero]) + mode_str 

57 # END for each 8 octal value 

58 

59 # git slices away the first octal if it's zero. 

60 if mode_str[0] == ord_zero: 

61 mode_str = mode_str[1:] 

62 # END save a byte 

63 

64 # Here it comes: If the name is actually unicode, the replacement below will not 

65 # work as the binsha is not part of the ascii unicode encoding - hence we must 

66 # convert to an UTF-8 string for it to work properly. According to my tests, 

67 # this is exactly what git does, that is it just takes the input literally, 

68 # which appears to be UTF-8 on linux. 

69 if isinstance(name, str): 

70 name_bytes = name.encode(defenc) 

71 else: 

72 name_bytes = name # type: ignore[unreachable] # check runtime types - is always str? 

73 write(b"".join((mode_str, b" ", name_bytes, b"\0", binsha))) 

74 # END for each item 

75 

76 

77def tree_entries_from_data(data: bytes) -> List[EntryTup]: 

78 """Read the binary representation of a tree and returns tuples of 

79 :class:`~git.objects.tree.Tree` items. 

80 

81 :param data: 

82 Data block with tree data (as bytes). 

83 

84 :return: 

85 list(tuple(binsha, mode, tree_relative_path), ...) 

86 """ 

87 ord_zero = ord("0") 

88 space_ord = ord(" ") 

89 len_data = len(data) 

90 i = 0 

91 out = [] 

92 while i < len_data: 

93 mode = 0 

94 

95 # Read Mode 

96 # Some git versions truncate the leading 0, some don't. 

97 # The type will be extracted from the mode later. 

98 while data[i] != space_ord: 

99 # Move existing mode integer up one level being 3 bits and add the actual 

100 # ordinal value of the character. 

101 mode = (mode << 3) + (data[i] - ord_zero) 

102 i += 1 

103 # END while reading mode 

104 

105 # Byte is space now, skip it. 

106 i += 1 

107 

108 # Parse name, it is NULL separated. 

109 

110 ns = i 

111 while data[i] != 0: 

112 i += 1 

113 # END while not reached NULL 

114 

115 # Default encoding for strings in git is UTF-8. 

116 # Only use the respective unicode object if the byte stream was encoded. 

117 name_bytes = data[ns:i] 

118 name = safe_decode(name_bytes) 

119 

120 # Byte is NULL, get next 20. 

121 i += 1 

122 sha = data[i : i + 20] 

123 i = i + 20 

124 out.append((sha, mode, name)) 

125 # END for each byte in data stream 

126 return out 

127 

128 

129def _find_by_name(tree_data: MutableSequence[EntryTupOrNone], name: str, is_dir: bool, start_at: int) -> EntryTupOrNone: 

130 """Return data entry matching the given name and tree mode or ``None``. 

131 

132 Before the item is returned, the respective data item is set None in the `tree_data` 

133 list to mark it done. 

134 """ 

135 

136 try: 

137 item = tree_data[start_at] 

138 if item and item[2] == name and S_ISDIR(item[1]) == is_dir: 

139 tree_data[start_at] = None 

140 return item 

141 except IndexError: 

142 pass 

143 # END exception handling 

144 for index, item in enumerate(tree_data): 

145 if item and item[2] == name and S_ISDIR(item[1]) == is_dir: 

146 tree_data[index] = None 

147 return item 

148 # END if item matches 

149 # END for each item 

150 return None 

151 

152 

153@overload 

154def _to_full_path(item: None, path_prefix: str) -> None: ... 

155 

156 

157@overload 

158def _to_full_path(item: EntryTup, path_prefix: str) -> EntryTup: ... 

159 

160 

161def _to_full_path(item: EntryTupOrNone, path_prefix: str) -> EntryTupOrNone: 

162 """Rebuild entry with given path prefix.""" 

163 if not item: 

164 return item 

165 return (item[0], item[1], path_prefix + item[2]) 

166 

167 

168def traverse_trees_recursive( 

169 odb: "GitCmdObjectDB", tree_shas: Sequence[Union[bytes, None]], path_prefix: str 

170) -> List[Tuple[EntryTupOrNone, ...]]: 

171 """ 

172 :return: 

173 List of list with entries according to the given binary tree-shas. 

174 

175 The result is encoded in a list 

176 of n tuple|None per blob/commit, (n == len(tree_shas)), where: 

177 

178 * [0] == 20 byte sha 

179 * [1] == mode as int 

180 * [2] == path relative to working tree root 

181 

182 The entry tuple is ``None`` if the respective blob/commit did not exist in the 

183 given tree. 

184 

185 :param tree_shas: 

186 Iterable of shas pointing to trees. All trees must be on the same level. 

187 A tree-sha may be ``None``, in which case ``None``. 

188 

189 :param path_prefix: 

190 A prefix to be added to the returned paths on this level. 

191 Set it ``""`` for the first iteration. 

192 

193 :note: 

194 The ordering of the returned items will be partially lost. 

195 """ 

196 trees_data: List[List[EntryTupOrNone]] = [] 

197 

198 nt = len(tree_shas) 

199 for tree_sha in tree_shas: 

200 if tree_sha is None: 

201 data: List[EntryTupOrNone] = [] 

202 else: 

203 # Make new list for typing as list invariant. 

204 data = list(tree_entries_from_data(odb.stream(tree_sha).read())) 

205 # END handle muted trees 

206 trees_data.append(data) 

207 # END for each sha to get data for 

208 

209 out: List[Tuple[EntryTupOrNone, ...]] = [] 

210 

211 # Find all matching entries and recursively process them together if the match is a 

212 # tree. If the match is a non-tree item, put it into the result. 

213 # Processed items will be set None. 

214 for ti, tree_data in enumerate(trees_data): 

215 for ii, item in enumerate(tree_data): 

216 if not item: 

217 continue 

218 # END skip already done items 

219 entries: List[EntryTupOrNone] 

220 entries = [None for _ in range(nt)] 

221 entries[ti] = item 

222 _sha, mode, name = item 

223 is_dir = S_ISDIR(mode) # Type mode bits 

224 

225 # Find this item in all other tree data items. 

226 # Wrap around, but stop one before our current index, hence ti+nt, not 

227 # ti+1+nt. 

228 for tio in range(ti + 1, ti + nt): 

229 tio = tio % nt 

230 entries[tio] = _find_by_name(trees_data[tio], name, is_dir, ii) 

231 

232 # END for each other item data 

233 # If we are a directory, enter recursion. 

234 if is_dir: 

235 out.extend( 

236 traverse_trees_recursive( 

237 odb, 

238 [((ei and ei[0]) or None) for ei in entries], 

239 path_prefix + name + "/", 

240 ) 

241 ) 

242 else: 

243 out.append(tuple(_to_full_path(e, path_prefix) for e in entries)) 

244 

245 # END handle recursion 

246 # Finally mark it done. 

247 tree_data[ii] = None 

248 # END for each item 

249 

250 # We are done with one tree, set all its data empty. 

251 del tree_data[:] 

252 # END for each tree_data chunk 

253 return out 

254 

255 

256def traverse_tree_recursive(odb: "GitCmdObjectDB", tree_sha: bytes, path_prefix: str) -> List[EntryTup]: 

257 """ 

258 :return: 

259 List of entries of the tree pointed to by the binary `tree_sha`. 

260 

261 An entry has the following format: 

262 

263 * [0] 20 byte sha 

264 * [1] mode as int 

265 * [2] path relative to the repository 

266 

267 :param path_prefix: 

268 Prefix to prepend to the front of all returned paths. 

269 """ 

270 entries = [] 

271 data = tree_entries_from_data(odb.stream(tree_sha).read()) 

272 

273 # Unpacking/packing is faster than accessing individual items. 

274 for sha, mode, name in data: 

275 if S_ISDIR(mode): 

276 entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + "/")) 

277 else: 

278 entries.append((sha, mode, path_prefix + name)) 

279 # END for each item 

280 

281 return entries