Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/index/fun.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

188 statements  

1# This module is part of GitPython and is released under the 

2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

3 

4"""Standalone functions to accompany the index implementation and make it more 

5versatile.""" 

6 

7__all__ = [ 

8 "write_cache", 

9 "read_cache", 

10 "write_tree_from_cache", 

11 "entry_key", 

12 "stat_mode_to_index_mode", 

13 "S_IFGITLINK", 

14 "run_commit_hook", 

15 "hook_path", 

16] 

17 

18from io import BytesIO 

19import os 

20import os.path as osp 

21from pathlib import Path 

22from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_ISDIR, S_ISLNK, S_IXUSR 

23import subprocess 

24import sys 

25 

26from gitdb.base import IStream 

27from gitdb.typ import str_tree_type 

28 

29from git.cmd import handle_process_output, safer_popen 

30from git.compat import defenc, force_bytes, force_text, safe_decode 

31from git.exc import HookExecutionError, UnmergedEntriesError 

32from git.objects.fun import ( 

33 traverse_tree_recursive, 

34 traverse_trees_recursive, 

35 tree_to_stream, 

36) 

37from git.util import IndexFileSHA1Writer, finalize_process 

38 

39from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT 

40from .util import pack, unpack 

41 

42# typing ----------------------------------------------------------------------------- 

43 

44from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast 

45 

46from git.types import PathLike 

47 

48if TYPE_CHECKING: 

49 from git.db import GitCmdObjectDB 

50 from git.objects.tree import TreeCacheTup 

51 

52 from .base import IndexFile 

53 

54# ------------------------------------------------------------------------------------ 

55 

56S_IFGITLINK = S_IFLNK | S_IFDIR 

57"""Flags for a submodule.""" 

58 

59CE_NAMEMASK_INV = ~CE_NAMEMASK 

60 

61 

62def hook_path(name: str, git_dir: PathLike) -> str: 

63 """:return: path to the given named hook in the given git repository directory""" 

64 return osp.join(git_dir, "hooks", name) 

65 

66 

67def _has_file_extension(path: str) -> str: 

68 return osp.splitext(path)[1] 

69 

70 

71def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: 

72 """Run the commit hook of the given name. Silently ignore hooks that do not exist. 

73 

74 :param name: 

75 Name of hook, like ``pre-commit``. 

76 

77 :param index: 

78 :class:`~git.index.base.IndexFile` instance. 

79 

80 :param args: 

81 Arguments passed to hook file. 

82 

83 :raise git.exc.HookExecutionError: 

84 """ 

85 hp = hook_path(name, index.repo.git_dir) 

86 if not os.access(hp, os.X_OK): 

87 return 

88 

89 env = os.environ.copy() 

90 env["GIT_INDEX_FILE"] = safe_decode(str(index.path)) 

91 env["GIT_EDITOR"] = ":" 

92 cmd = [hp] 

93 try: 

94 if sys.platform == "win32" and not _has_file_extension(hp): 

95 # Windows only uses extensions to determine how to open files 

96 # (doesn't understand shebangs). Try using bash to run the hook. 

97 relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix() 

98 cmd = ["bash.exe", relative_hp] 

99 

100 process = safer_popen( 

101 cmd + list(args), 

102 env=env, 

103 stdout=subprocess.PIPE, 

104 stderr=subprocess.PIPE, 

105 cwd=index.repo.working_dir, 

106 ) 

107 except Exception as ex: 

108 raise HookExecutionError(hp, ex) from ex 

109 else: 

110 stdout_list: List[str] = [] 

111 stderr_list: List[str] = [] 

112 handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process) 

113 stdout = "".join(stdout_list) 

114 stderr = "".join(stderr_list) 

115 if process.returncode != 0: 

116 stdout = force_text(stdout, defenc) 

117 stderr = force_text(stderr, defenc) 

118 raise HookExecutionError(hp, process.returncode, stderr, stdout) 

119 # END handle return code 

120 

121 

122def stat_mode_to_index_mode(mode: int) -> int: 

123 """Convert the given mode from a stat call to the corresponding index mode and 

124 return it.""" 

125 if S_ISLNK(mode): # symlinks 

126 return S_IFLNK 

127 if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules 

128 return S_IFGITLINK 

129 return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit 

130 

131 

132def write_cache( 

133 entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]], 

134 stream: IO[bytes], 

135 extension_data: Union[None, bytes] = None, 

136 ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer, 

137) -> None: 

138 """Write the cache represented by entries to a stream. 

139 

140 :param entries: 

141 **Sorted** list of entries. 

142 

143 :param stream: 

144 Stream to wrap into the AdapterStreamCls - it is used for final output. 

145 

146 :param ShaStreamCls: 

147 Type to use when writing to the stream. It produces a sha while writing to it, 

148 before the data is passed on to the wrapped stream. 

149 

150 :param extension_data: 

151 Any kind of data to write as a trailer, it must begin a 4 byte identifier, 

152 followed by its size (4 bytes). 

153 """ 

154 # Wrap the stream into a compatible writer. 

155 stream_sha = ShaStreamCls(stream) 

156 

157 tell = stream_sha.tell 

158 write = stream_sha.write 

159 

160 # Header 

161 version = 2 

162 write(b"DIRC") 

163 write(pack(">LL", version, len(entries))) 

164 

165 # Body 

166 for entry in entries: 

167 beginoffset = tell() 

168 write(entry.ctime_bytes) # ctime 

169 write(entry.mtime_bytes) # mtime 

170 path_str = str(entry.path) 

171 path: bytes = force_bytes(path_str, encoding=defenc) 

172 plen = len(path) & CE_NAMEMASK # Path length 

173 assert plen == len(path), "Path %s too long to fit into index" % entry.path 

174 flags = plen | (entry.flags & CE_NAMEMASK_INV) # Clear possible previous values. 

175 write( 

176 pack( 

177 ">LLLLLL20sH", 

178 entry.dev, 

179 entry.inode, 

180 entry.mode, 

181 entry.uid, 

182 entry.gid, 

183 entry.size, 

184 entry.binsha, 

185 flags, 

186 ) 

187 ) 

188 write(path) 

189 real_size = (tell() - beginoffset + 8) & ~7 

190 write(b"\0" * ((beginoffset + real_size) - tell())) 

191 # END for each entry 

192 

193 # Write previously cached extensions data. 

194 if extension_data is not None: 

195 stream_sha.write(extension_data) 

196 

197 # Write the sha over the content. 

198 stream_sha.write_sha() 

199 

200 

201def read_header(stream: IO[bytes]) -> Tuple[int, int]: 

202 """Return tuple(version_long, num_entries) from the given stream.""" 

203 type_id = stream.read(4) 

204 if type_id != b"DIRC": 

205 raise AssertionError("Invalid index file header: %r" % type_id) 

206 unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2))) 

207 version, num_entries = unpacked 

208 

209 # TODO: Handle version 3: extended data, see read-cache.c. 

210 assert version in (1, 2) 

211 return version, num_entries 

212 

213 

214def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]: 

215 """ 

216 :return: 

217 Key suitable to be used for the 

218 :attr:`index.entries <git.index.base.IndexFile.entries>` dictionary. 

219 

220 :param entry: 

221 One instance of type BaseIndexEntry or the path and the stage. 

222 """ 

223 

224 # def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]: 

225 # return isinstance(entry_key, tuple) and len(entry_key) == 2 

226 

227 if len(entry) == 1: 

228 entry_first = entry[0] 

229 assert isinstance(entry_first, BaseIndexEntry) 

230 return (entry_first.path, entry_first.stage) 

231 else: 

232 # assert is_entry_key_tup(entry) 

233 entry = cast(Tuple[PathLike, int], entry) 

234 return entry 

235 # END handle entry 

236 

237 

238def read_cache( 

239 stream: IO[bytes], 

240) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]: 

241 """Read a cache file from the given stream. 

242 

243 :return: 

244 tuple(version, entries_dict, extension_data, content_sha) 

245 

246 * *version* is the integer version number. 

247 * *entries_dict* is a dictionary which maps IndexEntry instances to a path at a 

248 stage. 

249 * *extension_data* is ``""`` or 4 bytes of type + 4 bytes of size + size bytes. 

250 * *content_sha* is a 20 byte sha on all cache file contents. 

251 """ 

252 version, num_entries = read_header(stream) 

253 count = 0 

254 entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {} 

255 

256 read = stream.read 

257 tell = stream.tell 

258 while count < num_entries: 

259 beginoffset = tell() 

260 ctime = unpack(">8s", read(8))[0] 

261 mtime = unpack(">8s", read(8))[0] 

262 (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2)) 

263 path_size = flags & CE_NAMEMASK 

264 path = read(path_size).decode(defenc) 

265 

266 real_size = (tell() - beginoffset + 8) & ~7 

267 read((beginoffset + real_size) - tell()) 

268 entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size)) 

269 # entry_key would be the method to use, but we save the effort. 

270 entries[(path, entry.stage)] = entry 

271 count += 1 

272 # END for each entry 

273 

274 # The footer contains extension data and a sha on the content so far. 

275 # Keep the extension footer,and verify we have a sha in the end. 

276 # Extension data format is: 

277 # 4 bytes ID 

278 # 4 bytes length of chunk 

279 # Repeated 0 - N times 

280 extension_data = stream.read(~0) 

281 assert len(extension_data) > 19, ( 

282 "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data) 

283 ) 

284 

285 content_sha = extension_data[-20:] 

286 

287 # Truncate the sha in the end as we will dynamically create it anyway. 

288 extension_data = extension_data[:-20] 

289 

290 return (version, entries, extension_data, content_sha) 

291 

292 

293def write_tree_from_cache( 

294 entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0 

295) -> Tuple[bytes, List["TreeCacheTup"]]: 

296 R"""Create a tree from the given sorted list of entries and put the respective 

297 trees into the given object database. 

298 

299 :param entries: 

300 **Sorted** list of :class:`~git.index.typ.IndexEntry`\s. 

301 

302 :param odb: 

303 Object database to store the trees in. 

304 

305 :param si: 

306 Start index at which we should start creating subtrees. 

307 

308 :param sl: 

309 Slice indicating the range we should process on the entries list. 

310 

311 :return: 

312 tuple(binsha, list(tree_entry, ...)) 

313 

314 A tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name. 

315 """ 

316 tree_items: List["TreeCacheTup"] = [] 

317 

318 ci = sl.start 

319 end = sl.stop 

320 while ci < end: 

321 entry = entries[ci] 

322 if entry.stage != 0: 

323 raise UnmergedEntriesError(entry) 

324 # END abort on unmerged 

325 ci += 1 

326 rbound = entry.path.find("/", si) 

327 if rbound == -1: 

328 # It's not a tree. 

329 tree_items.append((entry.binsha, entry.mode, entry.path[si:])) 

330 else: 

331 # Find common base range. 

332 base = entry.path[si:rbound] 

333 xi = ci 

334 while xi < end: 

335 oentry = entries[xi] 

336 orbound = oentry.path.find("/", si) 

337 if orbound == -1 or oentry.path[si:orbound] != base: 

338 break 

339 # END abort on base mismatch 

340 xi += 1 

341 # END find common base 

342 

343 # Enter recursion. 

344 # ci - 1 as we want to count our current item as well. 

345 sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) 

346 tree_items.append((sha, S_IFDIR, base)) 

347 

348 # Skip ahead. 

349 ci = xi 

350 # END handle bounds 

351 # END for each entry 

352 

353 # Finally create the tree. 

354 sio = BytesIO() 

355 tree_to_stream(tree_items, sio.write) # Writes to stream as bytes, but doesn't change tree_items. 

356 sio.seek(0) 

357 

358 istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) 

359 return (istream.binsha, tree_items) 

360 

361 

362def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry: 

363 return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) 

364 

365 

366def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: 

367 R""" 

368 :return: 

369 List of :class:`~git.index.typ.BaseIndexEntry`\s representing the aggressive 

370 merge of the given trees. All valid entries are on stage 0, whereas the 

371 conflicting ones are left on stage 1, 2 or 3, whereas stage 1 corresponds to the 

372 common ancestor tree, 2 to our tree and 3 to 'their' tree. 

373 

374 :param tree_shas: 

375 1, 2 or 3 trees as identified by their binary 20 byte shas. If 1 or two, the 

376 entries will effectively correspond to the last given tree. If 3 are given, a 3 

377 way merge is performed. 

378 """ 

379 out: List[BaseIndexEntry] = [] 

380 

381 # One and two way is the same for us, as we don't have to handle an existing 

382 # index, instrea 

383 if len(tree_shas) in (1, 2): 

384 for entry in traverse_tree_recursive(odb, tree_shas[-1], ""): 

385 out.append(_tree_entry_to_baseindexentry(entry, 0)) 

386 # END for each entry 

387 return out 

388 # END handle single tree 

389 

390 if len(tree_shas) > 3: 

391 raise ValueError("Cannot handle %i trees at once" % len(tree_shas)) 

392 

393 # Three trees. 

394 for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""): 

395 if base is not None: 

396 # Base version exists. 

397 if ours is not None: 

398 # Ours exists. 

399 if theirs is not None: 

400 # It exists in all branches. Ff it was changed in both 

401 # its a conflict. Otherwise, we take the changed version. 

402 # This should be the most common branch, so it comes first. 

403 if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or ( 

404 base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1] 

405 ): 

406 # Changed by both. 

407 out.append(_tree_entry_to_baseindexentry(base, 1)) 

408 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

409 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

410 elif base[0] != ours[0] or base[1] != ours[1]: 

411 # Only we changed it. 

412 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

413 else: 

414 # Either nobody changed it, or they did. In either 

415 # case, use theirs. 

416 out.append(_tree_entry_to_baseindexentry(theirs, 0)) 

417 # END handle modification 

418 else: 

419 if ours[0] != base[0] or ours[1] != base[1]: 

420 # They deleted it, we changed it, conflict. 

421 out.append(_tree_entry_to_baseindexentry(base, 1)) 

422 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

423 # else: 

424 # # We didn't change it, ignore. 

425 # pass 

426 # END handle our change 

427 # END handle theirs 

428 else: 

429 if theirs is None: 

430 # Deleted in both, its fine - it's out. 

431 pass 

432 else: 

433 if theirs[0] != base[0] or theirs[1] != base[1]: 

434 # Deleted in ours, changed theirs, conflict. 

435 out.append(_tree_entry_to_baseindexentry(base, 1)) 

436 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

437 # END theirs changed 

438 # else: 

439 # # Theirs didn't change. 

440 # pass 

441 # END handle theirs 

442 # END handle ours 

443 else: 

444 # All three can't be None. 

445 if ours is None: 

446 # Added in their branch. 

447 assert theirs is not None 

448 out.append(_tree_entry_to_baseindexentry(theirs, 0)) 

449 elif theirs is None: 

450 # Added in our branch. 

451 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

452 else: 

453 # Both have it, except for the base, see whether it changed. 

454 if ours[0] != theirs[0] or ours[1] != theirs[1]: 

455 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

456 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

457 else: 

458 # It was added the same in both. 

459 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

460 # END handle two items 

461 # END handle heads 

462 # END handle base exists 

463 # END for each entries tuple 

464 

465 return out