Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

251 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28import email.parser 

29import time 

30from collections.abc import Generator 

31from difflib import SequenceMatcher 

32from typing import ( 

33 IO, 

34 TYPE_CHECKING, 

35 BinaryIO, 

36 Optional, 

37 TextIO, 

38 Union, 

39) 

40 

41if TYPE_CHECKING: 

42 import email.message 

43 

44 from .object_store import BaseObjectStore 

45 

46from .objects import S_ISGITLINK, Blob, Commit 

47 

48FIRST_FEW_BYTES = 8000 

49 

50DEFAULT_DIFF_ALGORITHM = "myers" 

51 

52 

53class DiffAlgorithmNotAvailable(Exception): 

54 """Raised when a requested diff algorithm is not available.""" 

55 

56 def __init__(self, algorithm: str, install_hint: str = "") -> None: 

57 """Initialize exception. 

58 

59 Args: 

60 algorithm: Name of the unavailable algorithm 

61 install_hint: Optional installation hint 

62 """ 

63 self.algorithm = algorithm 

64 self.install_hint = install_hint 

65 if install_hint: 

66 super().__init__( 

67 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}" 

68 ) 

69 else: 

70 super().__init__( 

71 f"Diff algorithm '{algorithm}' requested but not available." 

72 ) 

73 

74 

75def write_commit_patch( 

76 f: IO[bytes], 

77 commit: "Commit", 

78 contents: Union[str, bytes], 

79 progress: tuple[int, int], 

80 version: Optional[str] = None, 

81 encoding: Optional[str] = None, 

82) -> None: 

83 """Write a individual file patch. 

84 

85 Args: 

86 f: File-like object to write to 

87 commit: Commit object 

88 contents: Contents of the patch 

89 progress: tuple with current patch number and total. 

90 version: Version string to include in patch header 

91 encoding: Encoding to use for the patch 

92 

93 Returns: 

94 tuple with filename and contents 

95 """ 

96 encoding = encoding or getattr(f, "encoding", "ascii") 

97 if encoding is None: 

98 encoding = "ascii" 

99 if isinstance(contents, str): 

100 contents = contents.encode(encoding) 

101 (num, total) = progress 

102 f.write( 

103 b"From " 

104 + commit.id 

105 + b" " 

106 + time.ctime(commit.commit_time).encode(encoding) 

107 + b"\n" 

108 ) 

109 f.write(b"From: " + commit.author + b"\n") 

110 f.write( 

111 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

112 ) 

113 f.write( 

114 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

115 ) 

116 f.write(b"\n") 

117 f.write(b"---\n") 

118 try: 

119 import subprocess 

120 

121 p = subprocess.Popen( 

122 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

123 ) 

124 except (ImportError, OSError): 

125 pass # diffstat not available? 

126 else: 

127 (diffstat, _) = p.communicate(contents) 

128 f.write(diffstat) 

129 f.write(b"\n") 

130 f.write(contents) 

131 f.write(b"-- \n") 

132 if version is None: 

133 from dulwich import __version__ as dulwich_version 

134 

135 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

136 else: 

137 if encoding is None: 

138 encoding = "ascii" 

139 f.write(version.encode(encoding) + b"\n") 

140 

141 

142def get_summary(commit: "Commit") -> str: 

143 """Determine the summary line for use in a filename. 

144 

145 Args: 

146 commit: Commit 

147 Returns: Summary string 

148 """ 

149 decoded = commit.message.decode(errors="replace") 

150 lines = decoded.splitlines() 

151 return lines[0].replace(" ", "-") if lines else "" 

152 

153 

154# Unified Diff 

155def _format_range_unified(start: int, stop: int) -> str: 

156 """Convert range to the "ed" format.""" 

157 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

158 beginning = start + 1 # lines start numbering with one 

159 length = stop - start 

160 if length == 1: 

161 return f"{beginning}" 

162 if not length: 

163 beginning -= 1 # empty ranges begin at line just before the range 

164 return f"{beginning},{length}" 

165 

166 

167def unified_diff( 

168 a: list[bytes], 

169 b: list[bytes], 

170 fromfile: bytes = b"", 

171 tofile: bytes = b"", 

172 fromfiledate: str = "", 

173 tofiledate: str = "", 

174 n: int = 3, 

175 lineterm: str = "\n", 

176 tree_encoding: str = "utf-8", 

177 output_encoding: str = "utf-8", 

178) -> Generator[bytes, None, None]: 

179 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does. 

180 

181 Based on the same function in Python2.7 difflib.py 

182 """ 

183 started = False 

184 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

185 if not started: 

186 started = True 

187 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

188 todate = f"\t{tofiledate}" if tofiledate else "" 

189 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

190 output_encoding 

191 ) 

192 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

193 output_encoding 

194 ) 

195 

196 first, last = group[0], group[-1] 

197 file1_range = _format_range_unified(first[1], last[2]) 

198 file2_range = _format_range_unified(first[3], last[4]) 

199 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

200 

201 for tag, i1, i2, j1, j2 in group: 

202 if tag == "equal": 

203 for line in a[i1:i2]: 

204 yield b" " + line 

205 continue 

206 if tag in ("replace", "delete"): 

207 for line in a[i1:i2]: 

208 if not line[-1:] == b"\n": 

209 line += b"\n\\ No newline at end of file\n" 

210 yield b"-" + line 

211 if tag in ("replace", "insert"): 

212 for line in b[j1:j2]: 

213 if not line[-1:] == b"\n": 

214 line += b"\n\\ No newline at end of file\n" 

215 yield b"+" + line 

216 

217 

218def _get_sequence_matcher(algorithm: str, a: list[bytes], b: list[bytes]): 

219 """Get appropriate sequence matcher for the given algorithm. 

220 

221 Args: 

222 algorithm: Diff algorithm ("myers" or "patience") 

223 a: First sequence 

224 b: Second sequence 

225 

226 Returns: 

227 Configured sequence matcher instance 

228 

229 Raises: 

230 DiffAlgorithmNotAvailable: If patience requested but not available 

231 """ 

232 if algorithm == "patience": 

233 try: 

234 from patiencediff import PatienceSequenceMatcher 

235 

236 return PatienceSequenceMatcher(None, a, b) 

237 except ImportError: 

238 raise DiffAlgorithmNotAvailable( 

239 "patience", "Install with: pip install 'dulwich[patiencediff]'" 

240 ) 

241 else: 

242 return SequenceMatcher(a=a, b=b) 

243 

244 

245def unified_diff_with_algorithm( 

246 a: list[bytes], 

247 b: list[bytes], 

248 fromfile: bytes = b"", 

249 tofile: bytes = b"", 

250 fromfiledate: str = "", 

251 tofiledate: str = "", 

252 n: int = 3, 

253 lineterm: str = "\n", 

254 tree_encoding: str = "utf-8", 

255 output_encoding: str = "utf-8", 

256 algorithm: Optional[str] = None, 

257) -> Generator[bytes, None, None]: 

258 """Generate unified diff with specified algorithm. 

259 

260 Args: 

261 a: First sequence of lines 

262 b: Second sequence of lines 

263 fromfile: Name of first file 

264 tofile: Name of second file 

265 fromfiledate: Date of first file 

266 tofiledate: Date of second file 

267 n: Number of context lines 

268 lineterm: Line terminator 

269 tree_encoding: Encoding for tree paths 

270 output_encoding: Encoding for output 

271 algorithm: Diff algorithm to use ("myers" or "patience") 

272 

273 Returns: 

274 Generator yielding diff lines 

275 

276 Raises: 

277 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available 

278 """ 

279 if algorithm is None: 

280 algorithm = DEFAULT_DIFF_ALGORITHM 

281 

282 matcher = _get_sequence_matcher(algorithm, a, b) 

283 

284 started = False 

285 for group in matcher.get_grouped_opcodes(n): 

286 if not started: 

287 started = True 

288 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

289 todate = f"\t{tofiledate}" if tofiledate else "" 

290 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

291 output_encoding 

292 ) 

293 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

294 output_encoding 

295 ) 

296 

297 first, last = group[0], group[-1] 

298 file1_range = _format_range_unified(first[1], last[2]) 

299 file2_range = _format_range_unified(first[3], last[4]) 

300 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

301 

302 for tag, i1, i2, j1, j2 in group: 

303 if tag == "equal": 

304 for line in a[i1:i2]: 

305 yield b" " + line 

306 continue 

307 if tag in ("replace", "delete"): 

308 for line in a[i1:i2]: 

309 if not line[-1:] == b"\n": 

310 line += b"\n\\ No newline at end of file\n" 

311 yield b"-" + line 

312 if tag in ("replace", "insert"): 

313 for line in b[j1:j2]: 

314 if not line[-1:] == b"\n": 

315 line += b"\n\\ No newline at end of file\n" 

316 yield b"+" + line 

317 

318 

319def is_binary(content: bytes) -> bool: 

320 """See if the first few bytes contain any null characters. 

321 

322 Args: 

323 content: Bytestring to check for binary content 

324 """ 

325 return b"\0" in content[:FIRST_FEW_BYTES] 

326 

327 

328def shortid(hexsha: Optional[bytes]) -> bytes: 

329 """Get short object ID. 

330 

331 Args: 

332 hexsha: Full hex SHA or None 

333 

334 Returns: 

335 7-character short ID 

336 """ 

337 if hexsha is None: 

338 return b"0" * 7 

339 else: 

340 return hexsha[:7] 

341 

342 

343def patch_filename(p: Optional[bytes], root: bytes) -> bytes: 

344 """Generate patch filename. 

345 

346 Args: 

347 p: Path or None 

348 root: Root directory 

349 

350 Returns: 

351 Full patch filename 

352 """ 

353 if p is None: 

354 return b"/dev/null" 

355 else: 

356 return root + b"/" + p 

357 

358 

359def write_object_diff( 

360 f: IO[bytes], 

361 store: "BaseObjectStore", 

362 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

363 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

364 diff_binary: bool = False, 

365 diff_algorithm: Optional[str] = None, 

366) -> None: 

367 """Write the diff for an object. 

368 

369 Args: 

370 f: File-like object to write to 

371 store: Store to retrieve objects from, if necessary 

372 old_file: (path, mode, hexsha) tuple 

373 new_file: (path, mode, hexsha) tuple 

374 diff_binary: Whether to diff files even if they 

375 are considered binary files by is_binary(). 

376 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

377 

378 Note: the tuple elements should be None for nonexistent files 

379 """ 

380 (old_path, old_mode, old_id) = old_file 

381 (new_path, new_mode, new_id) = new_file 

382 patched_old_path = patch_filename(old_path, b"a") 

383 patched_new_path = patch_filename(new_path, b"b") 

384 

385 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob: 

386 """Get blob content for a file. 

387 

388 Args: 

389 mode: File mode 

390 hexsha: Object SHA 

391 

392 Returns: 

393 Blob object 

394 """ 

395 if hexsha is None: 

396 return Blob.from_string(b"") 

397 elif mode is not None and S_ISGITLINK(mode): 

398 return Blob.from_string(b"Subproject commit " + hexsha + b"\n") 

399 else: 

400 obj = store[hexsha] 

401 if isinstance(obj, Blob): 

402 return obj 

403 else: 

404 # Fallback for non-blob objects 

405 return Blob.from_string(obj.as_raw_string()) 

406 

407 def lines(content: "Blob") -> list[bytes]: 

408 """Split blob content into lines. 

409 

410 Args: 

411 content: Blob content 

412 

413 Returns: 

414 List of lines 

415 """ 

416 if not content: 

417 return [] 

418 else: 

419 return content.splitlines() 

420 

421 f.writelines( 

422 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

423 ) 

424 old_content = content(old_mode, old_id) 

425 new_content = content(new_mode, new_id) 

426 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

427 binary_diff = ( 

428 b"Binary files " 

429 + patched_old_path 

430 + b" and " 

431 + patched_new_path 

432 + b" differ\n" 

433 ) 

434 f.write(binary_diff) 

435 else: 

436 f.writelines( 

437 unified_diff_with_algorithm( 

438 lines(old_content), 

439 lines(new_content), 

440 patched_old_path, 

441 patched_new_path, 

442 algorithm=diff_algorithm, 

443 ) 

444 ) 

445 

446 

447# TODO(jelmer): Support writing unicode, rather than bytes. 

448def gen_diff_header( 

449 paths: tuple[Optional[bytes], Optional[bytes]], 

450 modes: tuple[Optional[int], Optional[int]], 

451 shas: tuple[Optional[bytes], Optional[bytes]], 

452) -> Generator[bytes, None, None]: 

453 """Write a blob diff header. 

454 

455 Args: 

456 paths: Tuple with old and new path 

457 modes: Tuple with old and new modes 

458 shas: Tuple with old and new shas 

459 """ 

460 (old_path, new_path) = paths 

461 (old_mode, new_mode) = modes 

462 (old_sha, new_sha) = shas 

463 if old_path is None and new_path is not None: 

464 old_path = new_path 

465 if new_path is None and old_path is not None: 

466 new_path = old_path 

467 old_path = patch_filename(old_path, b"a") 

468 new_path = patch_filename(new_path, b"b") 

469 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

470 

471 if old_mode != new_mode: 

472 if new_mode is not None: 

473 if old_mode is not None: 

474 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

475 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

476 else: 

477 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

478 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

479 if new_mode is not None and old_mode is not None: 

480 yield (f" {new_mode:o}").encode("ascii") 

481 yield b"\n" 

482 

483 

484# TODO(jelmer): Support writing unicode, rather than bytes. 

485def write_blob_diff( 

486 f: IO[bytes], 

487 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

488 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

489 diff_algorithm: Optional[str] = None, 

490) -> None: 

491 """Write blob diff. 

492 

493 Args: 

494 f: File-like object to write to 

495 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

496 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

497 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

498 

499 Note: The use of write_object_diff is recommended over this function. 

500 """ 

501 (old_path, old_mode, old_blob) = old_file 

502 (new_path, new_mode, new_blob) = new_file 

503 patched_old_path = patch_filename(old_path, b"a") 

504 patched_new_path = patch_filename(new_path, b"b") 

505 

506 def lines(blob: Optional["Blob"]) -> list[bytes]: 

507 """Split blob content into lines. 

508 

509 Args: 

510 blob: Blob object or None 

511 

512 Returns: 

513 List of lines 

514 """ 

515 if blob is not None: 

516 return blob.splitlines() 

517 else: 

518 return [] 

519 

520 f.writelines( 

521 gen_diff_header( 

522 (old_path, new_path), 

523 (old_mode, new_mode), 

524 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

525 ) 

526 ) 

527 old_contents = lines(old_blob) 

528 new_contents = lines(new_blob) 

529 f.writelines( 

530 unified_diff_with_algorithm( 

531 old_contents, 

532 new_contents, 

533 patched_old_path, 

534 patched_new_path, 

535 algorithm=diff_algorithm, 

536 ) 

537 ) 

538 

539 

540def write_tree_diff( 

541 f: IO[bytes], 

542 store: "BaseObjectStore", 

543 old_tree: Optional[bytes], 

544 new_tree: Optional[bytes], 

545 diff_binary: bool = False, 

546 diff_algorithm: Optional[str] = None, 

547) -> None: 

548 """Write tree diff. 

549 

550 Args: 

551 f: File-like object to write to. 

552 store: Object store to read from 

553 old_tree: Old tree id 

554 new_tree: New tree id 

555 diff_binary: Whether to diff files even if they 

556 are considered binary files by is_binary(). 

557 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

558 """ 

559 changes = store.tree_changes(old_tree, new_tree) 

560 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

561 write_object_diff( 

562 f, 

563 store, 

564 (oldpath, oldmode, oldsha), 

565 (newpath, newmode, newsha), 

566 diff_binary=diff_binary, 

567 diff_algorithm=diff_algorithm, 

568 ) 

569 

570 

571def git_am_patch_split( 

572 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None 

573) -> tuple["Commit", bytes, Optional[bytes]]: 

574 """Parse a git-am-style patch and split it up into bits. 

575 

576 Args: 

577 f: File-like object to parse 

578 encoding: Encoding to use when creating Git objects 

579 Returns: Tuple with commit object, diff contents and git version 

580 """ 

581 encoding = encoding or getattr(f, "encoding", "ascii") 

582 encoding = encoding or "ascii" 

583 contents = f.read() 

584 if isinstance(contents, bytes): 

585 bparser = email.parser.BytesParser() 

586 msg = bparser.parsebytes(contents) 

587 else: 

588 uparser = email.parser.Parser() 

589 msg = uparser.parsestr(contents) 

590 return parse_patch_message(msg, encoding) 

591 

592 

593def parse_patch_message( 

594 msg: "email.message.Message", encoding: Optional[str] = None 

595) -> tuple["Commit", bytes, Optional[bytes]]: 

596 """Extract a Commit object and patch from an e-mail message. 

597 

598 Args: 

599 msg: An email message (email.message.Message) 

600 encoding: Encoding to use to encode Git commits 

601 Returns: Tuple with commit object, diff contents and git version 

602 """ 

603 c = Commit() 

604 if encoding is None: 

605 encoding = "ascii" 

606 c.author = msg["from"].encode(encoding) 

607 c.committer = msg["from"].encode(encoding) 

608 try: 

609 patch_tag_start = msg["subject"].index("[PATCH") 

610 except ValueError: 

611 subject = msg["subject"] 

612 else: 

613 close = msg["subject"].index("] ", patch_tag_start) 

614 subject = msg["subject"][close + 2 :] 

615 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

616 first = True 

617 

618 body = msg.get_payload(decode=True) 

619 if isinstance(body, str): 

620 body = body.encode(encoding) 

621 if isinstance(body, bytes): 

622 lines = body.splitlines(True) 

623 else: 

624 # Handle other types by converting to string first 

625 lines = str(body).encode(encoding).splitlines(True) 

626 line_iter = iter(lines) 

627 

628 for line in line_iter: 

629 if line == b"---\n": 

630 break 

631 if first: 

632 if line.startswith(b"From: "): 

633 c.author = line[len(b"From: ") :].rstrip() 

634 else: 

635 c.message += b"\n" + line 

636 first = False 

637 else: 

638 c.message += line 

639 diff = b"" 

640 for line in line_iter: 

641 if line == b"-- \n": 

642 break 

643 diff += line 

644 try: 

645 version = next(line_iter).rstrip(b"\n") 

646 except StopIteration: 

647 version = None 

648 return c, diff, version