Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/index/fun.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

197 statements  

1# This module is part of GitPython and is released under the 

2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

3 

4"""Standalone functions to accompany the index implementation and make it more 

5versatile.""" 

6 

7__all__ = [ 

8 "write_cache", 

9 "read_cache", 

10 "write_tree_from_cache", 

11 "entry_key", 

12 "stat_mode_to_index_mode", 

13 "S_IFGITLINK", 

14 "run_commit_hook", 

15 "hook_path", 

16] 

17 

18from io import BytesIO 

19import os 

20import os.path as osp 

21from pathlib import Path 

22from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_ISDIR, S_ISLNK, S_IXUSR 

23import subprocess 

24import sys 

25 

26from gitdb.base import IStream 

27from gitdb.typ import str_tree_type 

28 

29from git.cmd import handle_process_output, safer_popen 

30from git.compat import defenc, force_bytes, force_text, safe_decode 

31from git.exc import HookExecutionError, UnmergedEntriesError 

32from git.objects.fun import ( 

33 traverse_tree_recursive, 

34 traverse_trees_recursive, 

35 tree_to_stream, 

36) 

37from git.util import IndexFileSHA1Writer, finalize_process 

38 

39from .typ import CE_EXTENDED, BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT 

40from .util import pack, unpack 

41 

42# typing ----------------------------------------------------------------------------- 

43 

44from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast 

45 

46from git.types import PathLike 

47 

48if TYPE_CHECKING: 

49 from git.db import GitCmdObjectDB 

50 from git.objects.tree import TreeCacheTup 

51 

52 from .base import IndexFile 

53 

54# ------------------------------------------------------------------------------------ 

55 

56S_IFGITLINK = S_IFLNK | S_IFDIR 

57"""Flags for a submodule.""" 

58 

59CE_NAMEMASK_INV = ~CE_NAMEMASK 

60 

61 

62def hook_path(name: str, git_dir: PathLike) -> str: 

63 """:return: path to the given named hook in the given git repository directory""" 

64 return osp.join(git_dir, "hooks", name) 

65 

66 

67def _has_file_extension(path: str) -> str: 

68 return osp.splitext(path)[1] 

69 

70 

71def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None: 

72 """Run the commit hook of the given name. Silently ignore hooks that do not exist. 

73 

74 :param name: 

75 Name of hook, like ``pre-commit``. 

76 

77 :param index: 

78 :class:`~git.index.base.IndexFile` instance. 

79 

80 :param args: 

81 Arguments passed to hook file. 

82 

83 :raise git.exc.HookExecutionError: 

84 """ 

85 hp = hook_path(name, index.repo.git_dir) 

86 if not os.access(hp, os.X_OK): 

87 return 

88 

89 env = os.environ.copy() 

90 env["GIT_INDEX_FILE"] = safe_decode(str(index.path)) 

91 env["GIT_EDITOR"] = ":" 

92 cmd = [hp] 

93 try: 

94 if sys.platform == "win32" and not _has_file_extension(hp): 

95 # Windows only uses extensions to determine how to open files 

96 # (doesn't understand shebangs). Try using bash to run the hook. 

97 relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix() 

98 cmd = ["bash.exe", relative_hp] 

99 

100 process = safer_popen( 

101 cmd + list(args), 

102 env=env, 

103 stdout=subprocess.PIPE, 

104 stderr=subprocess.PIPE, 

105 cwd=index.repo.working_dir, 

106 ) 

107 except Exception as ex: 

108 raise HookExecutionError(hp, ex) from ex 

109 else: 

110 stdout_list: List[str] = [] 

111 stderr_list: List[str] = [] 

112 handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process) 

113 stdout = "".join(stdout_list) 

114 stderr = "".join(stderr_list) 

115 if process.returncode != 0: 

116 stdout = force_text(stdout, defenc) 

117 stderr = force_text(stderr, defenc) 

118 raise HookExecutionError(hp, process.returncode, stderr, stdout) 

119 # END handle return code 

120 

121 

122def stat_mode_to_index_mode(mode: int) -> int: 

123 """Convert the given mode from a stat call to the corresponding index mode and 

124 return it.""" 

125 if S_ISLNK(mode): # symlinks 

126 return S_IFLNK 

127 if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK: # submodules 

128 return S_IFGITLINK 

129 return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644) # blobs with or without executable bit 

130 

131 

132def write_cache( 

133 entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]], 

134 stream: IO[bytes], 

135 extension_data: Union[None, bytes] = None, 

136 ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer, 

137) -> None: 

138 """Write the cache represented by entries to a stream. 

139 

140 :param entries: 

141 **Sorted** list of entries. 

142 

143 :param stream: 

144 Stream to wrap into the AdapterStreamCls - it is used for final output. 

145 

146 :param ShaStreamCls: 

147 Type to use when writing to the stream. It produces a sha while writing to it, 

148 before the data is passed on to the wrapped stream. 

149 

150 :param extension_data: 

151 Any kind of data to write as a trailer, it must begin a 4 byte identifier, 

152 followed by its size (4 bytes). 

153 """ 

154 # Wrap the stream into a compatible writer. 

155 stream_sha = ShaStreamCls(stream) 

156 

157 tell = stream_sha.tell 

158 write = stream_sha.write 

159 

160 # Header 

161 version = 3 if any(entry.extended_flags for entry in entries) else 2 

162 write(b"DIRC") 

163 write(pack(">LL", version, len(entries))) 

164 

165 # Body 

166 for entry in entries: 

167 beginoffset = tell() 

168 write(entry.ctime_bytes) # ctime 

169 write(entry.mtime_bytes) # mtime 

170 path_str = str(entry.path) 

171 path: bytes = force_bytes(path_str, encoding=defenc) 

172 plen = len(path) & CE_NAMEMASK # Path length 

173 assert plen == len(path), "Path %s too long to fit into index" % entry.path 

174 flags = plen | (entry.flags & CE_NAMEMASK_INV) # Clear possible previous values. 

175 if entry.extended_flags: 

176 flags |= CE_EXTENDED 

177 write( 

178 pack( 

179 ">LLLLLL20sH", 

180 entry.dev, 

181 entry.inode, 

182 entry.mode, 

183 entry.uid, 

184 entry.gid, 

185 entry.size, 

186 entry.binsha, 

187 flags, 

188 ) 

189 ) 

190 if entry.extended_flags: 

191 write(pack(">H", entry.extended_flags)) 

192 write(path) 

193 real_size = (tell() - beginoffset + 8) & ~7 

194 write(b"\0" * ((beginoffset + real_size) - tell())) 

195 # END for each entry 

196 

197 # Write previously cached extensions data. 

198 if extension_data is not None: 

199 stream_sha.write(extension_data) 

200 

201 # Write the sha over the content. 

202 stream_sha.write_sha() 

203 

204 

205def read_header(stream: IO[bytes]) -> Tuple[int, int]: 

206 """Return tuple(version_long, num_entries) from the given stream.""" 

207 type_id = stream.read(4) 

208 if type_id != b"DIRC": 

209 raise AssertionError("Invalid index file header: %r" % type_id) 

210 unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2))) 

211 version, num_entries = unpacked 

212 

213 assert version in (1, 2, 3), "Unsupported git index version %i, only 1, 2, and 3 are supported" % version 

214 return version, num_entries 

215 

216 

217def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]: 

218 """ 

219 :return: 

220 Key suitable to be used for the 

221 :attr:`index.entries <git.index.base.IndexFile.entries>` dictionary. 

222 

223 :param entry: 

224 One instance of type BaseIndexEntry or the path and the stage. 

225 """ 

226 

227 # def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]: 

228 # return isinstance(entry_key, tuple) and len(entry_key) == 2 

229 

230 if len(entry) == 1: 

231 entry_first = entry[0] 

232 assert isinstance(entry_first, BaseIndexEntry) 

233 return (entry_first.path, entry_first.stage) 

234 else: 

235 # assert is_entry_key_tup(entry) 

236 entry = cast(Tuple[PathLike, int], entry) 

237 return entry 

238 # END handle entry 

239 

240 

241def read_cache( 

242 stream: IO[bytes], 

243) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]: 

244 """Read a cache file from the given stream. 

245 

246 :return: 

247 tuple(version, entries_dict, extension_data, content_sha) 

248 

249 * *version* is the integer version number. 

250 * *entries_dict* is a dictionary which maps IndexEntry instances to a path at a 

251 stage. 

252 * *extension_data* is ``""`` or 4 bytes of type + 4 bytes of size + size bytes. 

253 * *content_sha* is a 20 byte sha on all cache file contents. 

254 """ 

255 version, num_entries = read_header(stream) 

256 count = 0 

257 entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {} 

258 

259 read = stream.read 

260 tell = stream.tell 

261 while count < num_entries: 

262 beginoffset = tell() 

263 ctime = unpack(">8s", read(8))[0] 

264 mtime = unpack(">8s", read(8))[0] 

265 (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2)) 

266 extended_flags = 0 

267 if flags & CE_EXTENDED: 

268 extended_flags = unpack(">H", read(2))[0] 

269 path_size = flags & CE_NAMEMASK 

270 path = read(path_size).decode(defenc) 

271 

272 real_size = (tell() - beginoffset + 8) & ~7 

273 read((beginoffset + real_size) - tell()) 

274 entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size, extended_flags)) 

275 # entry_key would be the method to use, but we save the effort. 

276 entries[(path, entry.stage)] = entry 

277 count += 1 

278 # END for each entry 

279 

280 # The footer contains extension data and a sha on the content so far. 

281 # Keep the extension footer,and verify we have a sha in the end. 

282 # Extension data format is: 

283 # 4 bytes ID 

284 # 4 bytes length of chunk 

285 # Repeated 0 - N times 

286 extension_data = stream.read(~0) 

287 assert len(extension_data) > 19, ( 

288 "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data) 

289 ) 

290 

291 content_sha = extension_data[-20:] 

292 

293 # Truncate the sha in the end as we will dynamically create it anyway. 

294 extension_data = extension_data[:-20] 

295 

296 return (version, entries, extension_data, content_sha) 

297 

298 

299def write_tree_from_cache( 

300 entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0 

301) -> Tuple[bytes, List["TreeCacheTup"]]: 

302 R"""Create a tree from the given sorted list of entries and put the respective 

303 trees into the given object database. 

304 

305 :param entries: 

306 **Sorted** list of :class:`~git.index.typ.IndexEntry`\s. 

307 

308 :param odb: 

309 Object database to store the trees in. 

310 

311 :param si: 

312 Start index at which we should start creating subtrees. 

313 

314 :param sl: 

315 Slice indicating the range we should process on the entries list. 

316 

317 :return: 

318 tuple(binsha, list(tree_entry, ...)) 

319 

320 A tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name. 

321 """ 

322 tree_items: List["TreeCacheTup"] = [] 

323 

324 ci = sl.start 

325 end = sl.stop 

326 while ci < end: 

327 entry = entries[ci] 

328 if entry.stage != 0: 

329 raise UnmergedEntriesError(entry) 

330 # END abort on unmerged 

331 ci += 1 

332 rbound = entry.path.find("/", si) 

333 if rbound == -1: 

334 # It's not a tree. 

335 tree_items.append((entry.binsha, entry.mode, entry.path[si:])) 

336 else: 

337 # Find common base range. 

338 base = entry.path[si:rbound] 

339 xi = ci 

340 while xi < end: 

341 oentry = entries[xi] 

342 orbound = oentry.path.find("/", si) 

343 if orbound == -1 or oentry.path[si:orbound] != base: 

344 break 

345 # END abort on base mismatch 

346 xi += 1 

347 # END find common base 

348 

349 # Enter recursion. 

350 # ci - 1 as we want to count our current item as well. 

351 sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1) 

352 tree_items.append((sha, S_IFDIR, base)) 

353 

354 # Skip ahead. 

355 ci = xi 

356 # END handle bounds 

357 # END for each entry 

358 

359 # Finally create the tree. 

360 sio = BytesIO() 

361 tree_to_stream(tree_items, sio.write) # Writes to stream as bytes, but doesn't change tree_items. 

362 sio.seek(0) 

363 

364 istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio)) 

365 return (istream.binsha, tree_items) 

366 

367 

368def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry: 

369 return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2])) 

370 

371 

372def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]: 

373 R""" 

374 :return: 

375 List of :class:`~git.index.typ.BaseIndexEntry`\s representing the aggressive 

376 merge of the given trees. All valid entries are on stage 0, whereas the 

377 conflicting ones are left on stage 1, 2 or 3, whereas stage 1 corresponds to the 

378 common ancestor tree, 2 to our tree and 3 to 'their' tree. 

379 

380 :param tree_shas: 

381 1, 2 or 3 trees as identified by their binary 20 byte shas. If 1 or two, the 

382 entries will effectively correspond to the last given tree. If 3 are given, a 3 

383 way merge is performed. 

384 """ 

385 out: List[BaseIndexEntry] = [] 

386 

387 # One and two way is the same for us, as we don't have to handle an existing 

388 # index, instrea 

389 if len(tree_shas) in (1, 2): 

390 for entry in traverse_tree_recursive(odb, tree_shas[-1], ""): 

391 out.append(_tree_entry_to_baseindexentry(entry, 0)) 

392 # END for each entry 

393 return out 

394 # END handle single tree 

395 

396 if len(tree_shas) > 3: 

397 raise ValueError("Cannot handle %i trees at once" % len(tree_shas)) 

398 

399 # Three trees. 

400 for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""): 

401 if base is not None: 

402 # Base version exists. 

403 if ours is not None: 

404 # Ours exists. 

405 if theirs is not None: 

406 # It exists in all branches. Ff it was changed in both 

407 # its a conflict. Otherwise, we take the changed version. 

408 # This should be the most common branch, so it comes first. 

409 if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or ( 

410 base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1] 

411 ): 

412 # Changed by both. 

413 out.append(_tree_entry_to_baseindexentry(base, 1)) 

414 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

415 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

416 elif base[0] != ours[0] or base[1] != ours[1]: 

417 # Only we changed it. 

418 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

419 else: 

420 # Either nobody changed it, or they did. In either 

421 # case, use theirs. 

422 out.append(_tree_entry_to_baseindexentry(theirs, 0)) 

423 # END handle modification 

424 else: 

425 if ours[0] != base[0] or ours[1] != base[1]: 

426 # They deleted it, we changed it, conflict. 

427 out.append(_tree_entry_to_baseindexentry(base, 1)) 

428 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

429 # else: 

430 # # We didn't change it, ignore. 

431 # pass 

432 # END handle our change 

433 # END handle theirs 

434 else: 

435 if theirs is None: 

436 # Deleted in both, its fine - it's out. 

437 pass 

438 else: 

439 if theirs[0] != base[0] or theirs[1] != base[1]: 

440 # Deleted in ours, changed theirs, conflict. 

441 out.append(_tree_entry_to_baseindexentry(base, 1)) 

442 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

443 # END theirs changed 

444 # else: 

445 # # Theirs didn't change. 

446 # pass 

447 # END handle theirs 

448 # END handle ours 

449 else: 

450 # All three can't be None. 

451 if ours is None: 

452 # Added in their branch. 

453 assert theirs is not None 

454 out.append(_tree_entry_to_baseindexentry(theirs, 0)) 

455 elif theirs is None: 

456 # Added in our branch. 

457 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

458 else: 

459 # Both have it, except for the base, see whether it changed. 

460 if ours[0] != theirs[0] or ours[1] != theirs[1]: 

461 out.append(_tree_entry_to_baseindexentry(ours, 2)) 

462 out.append(_tree_entry_to_baseindexentry(theirs, 3)) 

463 else: 

464 # It was added the same in both. 

465 out.append(_tree_entry_to_baseindexentry(ours, 0)) 

466 # END handle two items 

467 # END handle heads 

468 # END handle base exists 

469 # END for each entries tuple 

470 

471 return out