Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

204 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28import email.parser 

29import time 

30from collections.abc import Generator 

31from difflib import SequenceMatcher 

32from typing import ( 

33 TYPE_CHECKING, 

34 BinaryIO, 

35 Optional, 

36 TextIO, 

37 Union, 

38) 

39 

40if TYPE_CHECKING: 

41 import email.message 

42 

43 from .object_store import BaseObjectStore 

44 

45from .objects import S_ISGITLINK, Blob, Commit 

46 

47FIRST_FEW_BYTES = 8000 

48 

49 

50def write_commit_patch( 

51 f: BinaryIO, 

52 commit: "Commit", 

53 contents: Union[str, bytes], 

54 progress: tuple[int, int], 

55 version: Optional[str] = None, 

56 encoding: Optional[str] = None, 

57) -> None: 

58 """Write a individual file patch. 

59 

60 Args: 

61 commit: Commit object 

62 progress: tuple with current patch number and total. 

63 

64 Returns: 

65 tuple with filename and contents 

66 """ 

67 encoding = encoding or getattr(f, "encoding", "ascii") 

68 if encoding is None: 

69 encoding = "ascii" 

70 if isinstance(contents, str): 

71 contents = contents.encode(encoding) 

72 (num, total) = progress 

73 f.write( 

74 b"From " 

75 + commit.id 

76 + b" " 

77 + time.ctime(commit.commit_time).encode(encoding) 

78 + b"\n" 

79 ) 

80 f.write(b"From: " + commit.author + b"\n") 

81 f.write( 

82 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

83 ) 

84 f.write( 

85 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

86 ) 

87 f.write(b"\n") 

88 f.write(b"---\n") 

89 try: 

90 import subprocess 

91 

92 p = subprocess.Popen( 

93 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

94 ) 

95 except (ImportError, OSError): 

96 pass # diffstat not available? 

97 else: 

98 (diffstat, _) = p.communicate(contents) 

99 f.write(diffstat) 

100 f.write(b"\n") 

101 f.write(contents) 

102 f.write(b"-- \n") 

103 if version is None: 

104 from dulwich import __version__ as dulwich_version 

105 

106 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

107 else: 

108 if encoding is None: 

109 encoding = "ascii" 

110 f.write(version.encode(encoding) + b"\n") 

111 

112 

113def get_summary(commit: "Commit") -> str: 

114 """Determine the summary line for use in a filename. 

115 

116 Args: 

117 commit: Commit 

118 Returns: Summary string 

119 """ 

120 decoded = commit.message.decode(errors="replace") 

121 return decoded.splitlines()[0].replace(" ", "-") 

122 

123 

124# Unified Diff 

125def _format_range_unified(start: int, stop: int) -> str: 

126 """Convert range to the "ed" format.""" 

127 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

128 beginning = start + 1 # lines start numbering with one 

129 length = stop - start 

130 if length == 1: 

131 return f"{beginning}" 

132 if not length: 

133 beginning -= 1 # empty ranges begin at line just before the range 

134 return f"{beginning},{length}" 

135 

136 

137def unified_diff( 

138 a: list[bytes], 

139 b: list[bytes], 

140 fromfile: bytes = b"", 

141 tofile: bytes = b"", 

142 fromfiledate: str = "", 

143 tofiledate: str = "", 

144 n: int = 3, 

145 lineterm: str = "\n", 

146 tree_encoding: str = "utf-8", 

147 output_encoding: str = "utf-8", 

148) -> Generator[bytes, None, None]: 

149 """difflib.unified_diff that can detect "No newline at end of file" as 

150 original "git diff" does. 

151 

152 Based on the same function in Python2.7 difflib.py 

153 """ 

154 started = False 

155 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

156 if not started: 

157 started = True 

158 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

159 todate = f"\t{tofiledate}" if tofiledate else "" 

160 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

161 output_encoding 

162 ) 

163 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

164 output_encoding 

165 ) 

166 

167 first, last = group[0], group[-1] 

168 file1_range = _format_range_unified(first[1], last[2]) 

169 file2_range = _format_range_unified(first[3], last[4]) 

170 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

171 

172 for tag, i1, i2, j1, j2 in group: 

173 if tag == "equal": 

174 for line in a[i1:i2]: 

175 yield b" " + line 

176 continue 

177 if tag in ("replace", "delete"): 

178 for line in a[i1:i2]: 

179 if not line[-1:] == b"\n": 

180 line += b"\n\\ No newline at end of file\n" 

181 yield b"-" + line 

182 if tag in ("replace", "insert"): 

183 for line in b[j1:j2]: 

184 if not line[-1:] == b"\n": 

185 line += b"\n\\ No newline at end of file\n" 

186 yield b"+" + line 

187 

188 

189def is_binary(content: bytes) -> bool: 

190 """See if the first few bytes contain any null characters. 

191 

192 Args: 

193 content: Bytestring to check for binary content 

194 """ 

195 return b"\0" in content[:FIRST_FEW_BYTES] 

196 

197 

198def shortid(hexsha: Optional[bytes]) -> bytes: 

199 if hexsha is None: 

200 return b"0" * 7 

201 else: 

202 return hexsha[:7] 

203 

204 

205def patch_filename(p: Optional[bytes], root: bytes) -> bytes: 

206 if p is None: 

207 return b"/dev/null" 

208 else: 

209 return root + b"/" + p 

210 

211 

212def write_object_diff( 

213 f: BinaryIO, 

214 store: "BaseObjectStore", 

215 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

216 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

217 diff_binary: bool = False, 

218) -> None: 

219 """Write the diff for an object. 

220 

221 Args: 

222 f: File-like object to write to 

223 store: Store to retrieve objects from, if necessary 

224 old_file: (path, mode, hexsha) tuple 

225 new_file: (path, mode, hexsha) tuple 

226 diff_binary: Whether to diff files even if they 

227 are considered binary files by is_binary(). 

228 

229 Note: the tuple elements should be None for nonexistent files 

230 """ 

231 (old_path, old_mode, old_id) = old_file 

232 (new_path, new_mode, new_id) = new_file 

233 patched_old_path = patch_filename(old_path, b"a") 

234 patched_new_path = patch_filename(new_path, b"b") 

235 

236 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob: 

237 from typing import cast 

238 

239 if hexsha is None: 

240 return cast(Blob, Blob.from_string(b"")) 

241 elif mode is not None and S_ISGITLINK(mode): 

242 return cast(Blob, Blob.from_string(b"Subproject commit " + hexsha + b"\n")) 

243 else: 

244 obj = store[hexsha] 

245 if isinstance(obj, Blob): 

246 return obj 

247 else: 

248 # Fallback for non-blob objects 

249 return cast(Blob, Blob.from_string(obj.as_raw_string())) 

250 

251 def lines(content: "Blob") -> list[bytes]: 

252 if not content: 

253 return [] 

254 else: 

255 return content.splitlines() 

256 

257 f.writelines( 

258 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

259 ) 

260 old_content = content(old_mode, old_id) 

261 new_content = content(new_mode, new_id) 

262 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

263 binary_diff = ( 

264 b"Binary files " 

265 + patched_old_path 

266 + b" and " 

267 + patched_new_path 

268 + b" differ\n" 

269 ) 

270 f.write(binary_diff) 

271 else: 

272 f.writelines( 

273 unified_diff( 

274 lines(old_content), 

275 lines(new_content), 

276 patched_old_path, 

277 patched_new_path, 

278 ) 

279 ) 

280 

281 

282# TODO(jelmer): Support writing unicode, rather than bytes. 

283def gen_diff_header( 

284 paths: tuple[Optional[bytes], Optional[bytes]], 

285 modes: tuple[Optional[int], Optional[int]], 

286 shas: tuple[Optional[bytes], Optional[bytes]], 

287) -> Generator[bytes, None, None]: 

288 """Write a blob diff header. 

289 

290 Args: 

291 paths: Tuple with old and new path 

292 modes: Tuple with old and new modes 

293 shas: Tuple with old and new shas 

294 """ 

295 (old_path, new_path) = paths 

296 (old_mode, new_mode) = modes 

297 (old_sha, new_sha) = shas 

298 if old_path is None and new_path is not None: 

299 old_path = new_path 

300 if new_path is None and old_path is not None: 

301 new_path = old_path 

302 old_path = patch_filename(old_path, b"a") 

303 new_path = patch_filename(new_path, b"b") 

304 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

305 

306 if old_mode != new_mode: 

307 if new_mode is not None: 

308 if old_mode is not None: 

309 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

310 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

311 else: 

312 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

313 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

314 if new_mode is not None and old_mode is not None: 

315 yield (f" {new_mode:o}").encode("ascii") 

316 yield b"\n" 

317 

318 

319# TODO(jelmer): Support writing unicode, rather than bytes. 

320def write_blob_diff( 

321 f: BinaryIO, 

322 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

323 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

324) -> None: 

325 """Write blob diff. 

326 

327 Args: 

328 f: File-like object to write to 

329 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

330 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

331 

332 Note: The use of write_object_diff is recommended over this function. 

333 """ 

334 (old_path, old_mode, old_blob) = old_file 

335 (new_path, new_mode, new_blob) = new_file 

336 patched_old_path = patch_filename(old_path, b"a") 

337 patched_new_path = patch_filename(new_path, b"b") 

338 

339 def lines(blob: Optional["Blob"]) -> list[bytes]: 

340 if blob is not None: 

341 return blob.splitlines() 

342 else: 

343 return [] 

344 

345 f.writelines( 

346 gen_diff_header( 

347 (old_path, new_path), 

348 (old_mode, new_mode), 

349 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

350 ) 

351 ) 

352 old_contents = lines(old_blob) 

353 new_contents = lines(new_blob) 

354 f.writelines( 

355 unified_diff(old_contents, new_contents, patched_old_path, patched_new_path) 

356 ) 

357 

358 

359def write_tree_diff( 

360 f: BinaryIO, 

361 store: "BaseObjectStore", 

362 old_tree: Optional[bytes], 

363 new_tree: Optional[bytes], 

364 diff_binary: bool = False, 

365) -> None: 

366 """Write tree diff. 

367 

368 Args: 

369 f: File-like object to write to. 

370 old_tree: Old tree id 

371 new_tree: New tree id 

372 diff_binary: Whether to diff files even if they 

373 are considered binary files by is_binary(). 

374 """ 

375 changes = store.tree_changes(old_tree, new_tree) 

376 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

377 write_object_diff( 

378 f, 

379 store, 

380 (oldpath, oldmode, oldsha), 

381 (newpath, newmode, newsha), 

382 diff_binary=diff_binary, 

383 ) 

384 

385 

386def git_am_patch_split( 

387 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None 

388) -> tuple["Commit", bytes, Optional[bytes]]: 

389 """Parse a git-am-style patch and split it up into bits. 

390 

391 Args: 

392 f: File-like object to parse 

393 encoding: Encoding to use when creating Git objects 

394 Returns: Tuple with commit object, diff contents and git version 

395 """ 

396 encoding = encoding or getattr(f, "encoding", "ascii") 

397 encoding = encoding or "ascii" 

398 contents = f.read() 

399 if isinstance(contents, bytes): 

400 bparser = email.parser.BytesParser() 

401 msg = bparser.parsebytes(contents) 

402 else: 

403 uparser = email.parser.Parser() 

404 msg = uparser.parsestr(contents) 

405 return parse_patch_message(msg, encoding) 

406 

407 

408def parse_patch_message( 

409 msg: "email.message.Message", encoding: Optional[str] = None 

410) -> tuple["Commit", bytes, Optional[bytes]]: 

411 """Extract a Commit object and patch from an e-mail message. 

412 

413 Args: 

414 msg: An email message (email.message.Message) 

415 encoding: Encoding to use to encode Git commits 

416 Returns: Tuple with commit object, diff contents and git version 

417 """ 

418 c = Commit() 

419 if encoding is None: 

420 encoding = "ascii" 

421 c.author = msg["from"].encode(encoding) 

422 c.committer = msg["from"].encode(encoding) 

423 try: 

424 patch_tag_start = msg["subject"].index("[PATCH") 

425 except ValueError: 

426 subject = msg["subject"] 

427 else: 

428 close = msg["subject"].index("] ", patch_tag_start) 

429 subject = msg["subject"][close + 2 :] 

430 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

431 first = True 

432 

433 body = msg.get_payload(decode=True) 

434 if isinstance(body, str): 

435 body = body.encode(encoding) 

436 if isinstance(body, bytes): 

437 lines = body.splitlines(True) 

438 else: 

439 # Handle other types by converting to string first 

440 lines = str(body).encode(encoding).splitlines(True) 

441 line_iter = iter(lines) 

442 

443 for line in line_iter: 

444 if line == b"---\n": 

445 break 

446 if first: 

447 if line.startswith(b"From: "): 

448 c.author = line[len(b"From: ") :].rstrip() 

449 else: 

450 c.message += b"\n" + line 

451 first = False 

452 else: 

453 c.message += line 

454 diff = b"" 

455 for line in line_iter: 

456 if line == b"-- \n": 

457 break 

458 diff += line 

459 try: 

460 version = next(line_iter).rstrip(b"\n") 

461 except StopIteration: 

462 version = None 

463 return c, diff, version