Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

390 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28__all__ = [ 

29 "DEFAULT_DIFF_ALGORITHM", 

30 "FIRST_FEW_BYTES", 

31 "DiffAlgorithmNotAvailable", 

32 "MailinfoResult", 

33 "commit_patch_id", 

34 "gen_diff_header", 

35 "get_summary", 

36 "git_am_patch_split", 

37 "is_binary", 

38 "mailinfo", 

39 "parse_patch_message", 

40 "patch_filename", 

41 "patch_id", 

42 "shortid", 

43 "unified_diff", 

44 "unified_diff_with_algorithm", 

45 "write_blob_diff", 

46 "write_commit_patch", 

47 "write_object_diff", 

48 "write_tree_diff", 

49] 

50 

51import email.message 

52import email.parser 

53import email.utils 

54import re 

55import time 

56from collections.abc import Generator, Sequence 

57from dataclasses import dataclass 

58from difflib import SequenceMatcher 

59from typing import ( 

60 IO, 

61 TYPE_CHECKING, 

62 BinaryIO, 

63 TextIO, 

64) 

65 

66if TYPE_CHECKING: 

67 from .object_store import BaseObjectStore 

68 

69from .objects import S_ISGITLINK, Blob, Commit, ObjectID, RawObjectID 

70 

71FIRST_FEW_BYTES = 8000 

72 

73DEFAULT_DIFF_ALGORITHM = "myers" 

74 

75 

76class DiffAlgorithmNotAvailable(Exception): 

77 """Raised when a requested diff algorithm is not available.""" 

78 

79 def __init__(self, algorithm: str, install_hint: str = "") -> None: 

80 """Initialize exception. 

81 

82 Args: 

83 algorithm: Name of the unavailable algorithm 

84 install_hint: Optional installation hint 

85 """ 

86 self.algorithm = algorithm 

87 self.install_hint = install_hint 

88 if install_hint: 

89 super().__init__( 

90 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}" 

91 ) 

92 else: 

93 super().__init__( 

94 f"Diff algorithm '{algorithm}' requested but not available." 

95 ) 

96 

97 

98def write_commit_patch( 

99 f: IO[bytes], 

100 commit: "Commit", 

101 contents: str | bytes, 

102 progress: tuple[int, int], 

103 version: str | None = None, 

104 encoding: str | None = None, 

105) -> None: 

106 """Write a individual file patch. 

107 

108 Args: 

109 f: File-like object to write to 

110 commit: Commit object 

111 contents: Contents of the patch 

112 progress: tuple with current patch number and total. 

113 version: Version string to include in patch header 

114 encoding: Encoding to use for the patch 

115 

116 Returns: 

117 tuple with filename and contents 

118 """ 

119 encoding = encoding or getattr(f, "encoding", "ascii") 

120 if encoding is None: 

121 encoding = "ascii" 

122 if isinstance(contents, str): 

123 contents = contents.encode(encoding) 

124 (num, total) = progress 

125 f.write( 

126 b"From " 

127 + commit.id 

128 + b" " 

129 + time.ctime(commit.commit_time).encode(encoding) 

130 + b"\n" 

131 ) 

132 f.write(b"From: " + commit.author + b"\n") 

133 f.write( 

134 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

135 ) 

136 f.write( 

137 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

138 ) 

139 f.write(b"\n") 

140 f.write(b"---\n") 

141 try: 

142 import subprocess 

143 

144 p = subprocess.Popen( 

145 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

146 ) 

147 except (ImportError, OSError): 

148 pass # diffstat not available? 

149 else: 

150 (diffstat, _) = p.communicate(contents) 

151 f.write(diffstat) 

152 f.write(b"\n") 

153 f.write(contents) 

154 f.write(b"-- \n") 

155 if version is None: 

156 from dulwich import __version__ as dulwich_version 

157 

158 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

159 else: 

160 if encoding is None: 

161 encoding = "ascii" 

162 f.write(version.encode(encoding) + b"\n") 

163 

164 

165def get_summary(commit: "Commit") -> str: 

166 """Determine the summary line for use in a filename. 

167 

168 Args: 

169 commit: Commit 

170 Returns: Summary string 

171 """ 

172 decoded = commit.message.decode(errors="replace") 

173 lines = decoded.splitlines() 

174 return lines[0].replace(" ", "-") if lines else "" 

175 

176 

177# Unified Diff 

178def _format_range_unified(start: int, stop: int) -> str: 

179 """Convert range to the "ed" format.""" 

180 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

181 beginning = start + 1 # lines start numbering with one 

182 length = stop - start 

183 if length == 1: 

184 return f"{beginning}" 

185 if not length: 

186 beginning -= 1 # empty ranges begin at line just before the range 

187 return f"{beginning},{length}" 

188 

189 

190def unified_diff( 

191 a: Sequence[bytes], 

192 b: Sequence[bytes], 

193 fromfile: bytes = b"", 

194 tofile: bytes = b"", 

195 fromfiledate: str = "", 

196 tofiledate: str = "", 

197 n: int = 3, 

198 lineterm: str = "\n", 

199 tree_encoding: str = "utf-8", 

200 output_encoding: str = "utf-8", 

201) -> Generator[bytes, None, None]: 

202 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does. 

203 

204 Based on the same function in Python2.7 difflib.py 

205 """ 

206 started = False 

207 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

208 if not started: 

209 started = True 

210 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

211 todate = f"\t{tofiledate}" if tofiledate else "" 

212 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

213 output_encoding 

214 ) 

215 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

216 output_encoding 

217 ) 

218 

219 first, last = group[0], group[-1] 

220 file1_range = _format_range_unified(first[1], last[2]) 

221 file2_range = _format_range_unified(first[3], last[4]) 

222 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

223 

224 for tag, i1, i2, j1, j2 in group: 

225 if tag == "equal": 

226 for line in a[i1:i2]: 

227 yield b" " + line 

228 continue 

229 if tag in ("replace", "delete"): 

230 for line in a[i1:i2]: 

231 if not line[-1:] == b"\n": 

232 line += b"\n\\ No newline at end of file\n" 

233 yield b"-" + line 

234 if tag in ("replace", "insert"): 

235 for line in b[j1:j2]: 

236 if not line[-1:] == b"\n": 

237 line += b"\n\\ No newline at end of file\n" 

238 yield b"+" + line 

239 

240 

241def _get_sequence_matcher( 

242 algorithm: str, a: Sequence[bytes], b: Sequence[bytes] 

243) -> SequenceMatcher[bytes]: 

244 """Get appropriate sequence matcher for the given algorithm. 

245 

246 Args: 

247 algorithm: Diff algorithm ("myers" or "patience") 

248 a: First sequence 

249 b: Second sequence 

250 

251 Returns: 

252 Configured sequence matcher instance 

253 

254 Raises: 

255 DiffAlgorithmNotAvailable: If patience requested but not available 

256 """ 

257 if algorithm == "patience": 

258 try: 

259 from patiencediff import PatienceSequenceMatcher 

260 

261 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore] 

262 except ImportError: 

263 raise DiffAlgorithmNotAvailable( 

264 "patience", "Install with: pip install 'dulwich[patiencediff]'" 

265 ) 

266 else: 

267 return SequenceMatcher(a=a, b=b) 

268 

269 

270def unified_diff_with_algorithm( 

271 a: Sequence[bytes], 

272 b: Sequence[bytes], 

273 fromfile: bytes = b"", 

274 tofile: bytes = b"", 

275 fromfiledate: str = "", 

276 tofiledate: str = "", 

277 n: int = 3, 

278 lineterm: str = "\n", 

279 tree_encoding: str = "utf-8", 

280 output_encoding: str = "utf-8", 

281 algorithm: str | None = None, 

282) -> Generator[bytes, None, None]: 

283 """Generate unified diff with specified algorithm. 

284 

285 Args: 

286 a: First sequence of lines 

287 b: Second sequence of lines 

288 fromfile: Name of first file 

289 tofile: Name of second file 

290 fromfiledate: Date of first file 

291 tofiledate: Date of second file 

292 n: Number of context lines 

293 lineterm: Line terminator 

294 tree_encoding: Encoding for tree paths 

295 output_encoding: Encoding for output 

296 algorithm: Diff algorithm to use ("myers" or "patience") 

297 

298 Returns: 

299 Generator yielding diff lines 

300 

301 Raises: 

302 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available 

303 """ 

304 if algorithm is None: 

305 algorithm = DEFAULT_DIFF_ALGORITHM 

306 

307 matcher = _get_sequence_matcher(algorithm, a, b) 

308 

309 started = False 

310 for group in matcher.get_grouped_opcodes(n): 

311 if not started: 

312 started = True 

313 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

314 todate = f"\t{tofiledate}" if tofiledate else "" 

315 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

316 output_encoding 

317 ) 

318 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

319 output_encoding 

320 ) 

321 

322 first, last = group[0], group[-1] 

323 file1_range = _format_range_unified(first[1], last[2]) 

324 file2_range = _format_range_unified(first[3], last[4]) 

325 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

326 

327 for tag, i1, i2, j1, j2 in group: 

328 if tag == "equal": 

329 for line in a[i1:i2]: 

330 yield b" " + line 

331 continue 

332 if tag in ("replace", "delete"): 

333 for line in a[i1:i2]: 

334 if not line[-1:] == b"\n": 

335 line += b"\n\\ No newline at end of file\n" 

336 yield b"-" + line 

337 if tag in ("replace", "insert"): 

338 for line in b[j1:j2]: 

339 if not line[-1:] == b"\n": 

340 line += b"\n\\ No newline at end of file\n" 

341 yield b"+" + line 

342 

343 

344def is_binary(content: bytes) -> bool: 

345 """See if the first few bytes contain any null characters. 

346 

347 Args: 

348 content: Bytestring to check for binary content 

349 """ 

350 return b"\0" in content[:FIRST_FEW_BYTES] 

351 

352 

353def shortid(hexsha: bytes | None) -> bytes: 

354 """Get short object ID. 

355 

356 Args: 

357 hexsha: Full hex SHA or None 

358 

359 Returns: 

360 7-character short ID 

361 """ 

362 if hexsha is None: 

363 return b"0" * 7 

364 else: 

365 return hexsha[:7] 

366 

367 

368def patch_filename(p: bytes | None, root: bytes) -> bytes: 

369 """Generate patch filename. 

370 

371 Args: 

372 p: Path or None 

373 root: Root directory 

374 

375 Returns: 

376 Full patch filename 

377 """ 

378 if p is None: 

379 return b"/dev/null" 

380 else: 

381 return root + b"/" + p 

382 

383 

384def write_object_diff( 

385 f: IO[bytes], 

386 store: "BaseObjectStore", 

387 old_file: tuple[bytes | None, int | None, ObjectID | None], 

388 new_file: tuple[bytes | None, int | None, ObjectID | None], 

389 diff_binary: bool = False, 

390 diff_algorithm: str | None = None, 

391) -> None: 

392 """Write the diff for an object. 

393 

394 Args: 

395 f: File-like object to write to 

396 store: Store to retrieve objects from, if necessary 

397 old_file: (path, mode, hexsha) tuple 

398 new_file: (path, mode, hexsha) tuple 

399 diff_binary: Whether to diff files even if they 

400 are considered binary files by is_binary(). 

401 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

402 

403 Note: the tuple elements should be None for nonexistent files 

404 """ 

405 (old_path, old_mode, old_id) = old_file 

406 (new_path, new_mode, new_id) = new_file 

407 patched_old_path = patch_filename(old_path, b"a") 

408 patched_new_path = patch_filename(new_path, b"b") 

409 

410 def content(mode: int | None, hexsha: ObjectID | None) -> Blob: 

411 """Get blob content for a file. 

412 

413 Args: 

414 mode: File mode 

415 hexsha: Object SHA 

416 

417 Returns: 

418 Blob object 

419 """ 

420 if hexsha is None: 

421 return Blob.from_string(b"") 

422 elif mode is not None and S_ISGITLINK(mode): 

423 return Blob.from_string(b"Subproject commit " + hexsha + b"\n") 

424 else: 

425 obj = store[hexsha] 

426 if isinstance(obj, Blob): 

427 return obj 

428 else: 

429 # Fallback for non-blob objects 

430 return Blob.from_string(obj.as_raw_string()) 

431 

432 def lines(content: "Blob") -> list[bytes]: 

433 """Split blob content into lines. 

434 

435 Args: 

436 content: Blob content 

437 

438 Returns: 

439 List of lines 

440 """ 

441 if not content: 

442 return [] 

443 else: 

444 return content.splitlines() 

445 

446 f.writelines( 

447 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

448 ) 

449 old_content = content(old_mode, old_id) 

450 new_content = content(new_mode, new_id) 

451 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

452 binary_diff = ( 

453 b"Binary files " 

454 + patched_old_path 

455 + b" and " 

456 + patched_new_path 

457 + b" differ\n" 

458 ) 

459 f.write(binary_diff) 

460 else: 

461 f.writelines( 

462 unified_diff_with_algorithm( 

463 lines(old_content), 

464 lines(new_content), 

465 patched_old_path, 

466 patched_new_path, 

467 algorithm=diff_algorithm, 

468 ) 

469 ) 

470 

471 

472# TODO(jelmer): Support writing unicode, rather than bytes. 

473def gen_diff_header( 

474 paths: tuple[bytes | None, bytes | None], 

475 modes: tuple[int | None, int | None], 

476 shas: tuple[bytes | None, bytes | None], 

477) -> Generator[bytes, None, None]: 

478 """Write a blob diff header. 

479 

480 Args: 

481 paths: Tuple with old and new path 

482 modes: Tuple with old and new modes 

483 shas: Tuple with old and new shas 

484 """ 

485 (old_path, new_path) = paths 

486 (old_mode, new_mode) = modes 

487 (old_sha, new_sha) = shas 

488 if old_path is None and new_path is not None: 

489 old_path = new_path 

490 if new_path is None and old_path is not None: 

491 new_path = old_path 

492 old_path = patch_filename(old_path, b"a") 

493 new_path = patch_filename(new_path, b"b") 

494 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

495 

496 if old_mode != new_mode: 

497 if new_mode is not None: 

498 if old_mode is not None: 

499 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

500 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

501 else: 

502 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

503 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

504 if new_mode is not None and old_mode is not None: 

505 yield (f" {new_mode:o}").encode("ascii") 

506 yield b"\n" 

507 

508 

509# TODO(jelmer): Support writing unicode, rather than bytes. 

510def write_blob_diff( 

511 f: IO[bytes], 

512 old_file: tuple[bytes | None, int | None, "Blob | None"], 

513 new_file: tuple[bytes | None, int | None, "Blob | None"], 

514 diff_algorithm: str | None = None, 

515) -> None: 

516 """Write blob diff. 

517 

518 Args: 

519 f: File-like object to write to 

520 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

521 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

522 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

523 

524 Note: The use of write_object_diff is recommended over this function. 

525 """ 

526 (old_path, old_mode, old_blob) = old_file 

527 (new_path, new_mode, new_blob) = new_file 

528 patched_old_path = patch_filename(old_path, b"a") 

529 patched_new_path = patch_filename(new_path, b"b") 

530 

531 def lines(blob: "Blob | None") -> list[bytes]: 

532 """Split blob content into lines. 

533 

534 Args: 

535 blob: Blob object or None 

536 

537 Returns: 

538 List of lines 

539 """ 

540 if blob is not None: 

541 return blob.splitlines() 

542 else: 

543 return [] 

544 

545 f.writelines( 

546 gen_diff_header( 

547 (old_path, new_path), 

548 (old_mode, new_mode), 

549 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

550 ) 

551 ) 

552 old_contents = lines(old_blob) 

553 new_contents = lines(new_blob) 

554 f.writelines( 

555 unified_diff_with_algorithm( 

556 old_contents, 

557 new_contents, 

558 patched_old_path, 

559 patched_new_path, 

560 algorithm=diff_algorithm, 

561 ) 

562 ) 

563 

564 

565def write_tree_diff( 

566 f: IO[bytes], 

567 store: "BaseObjectStore", 

568 old_tree: ObjectID | None, 

569 new_tree: ObjectID | None, 

570 diff_binary: bool = False, 

571 diff_algorithm: str | None = None, 

572) -> None: 

573 """Write tree diff. 

574 

575 Args: 

576 f: File-like object to write to. 

577 store: Object store to read from 

578 old_tree: Old tree id 

579 new_tree: New tree id 

580 diff_binary: Whether to diff files even if they 

581 are considered binary files by is_binary(). 

582 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

583 """ 

584 changes = store.tree_changes(old_tree, new_tree) 

585 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

586 write_object_diff( 

587 f, 

588 store, 

589 (oldpath, oldmode, oldsha), 

590 (newpath, newmode, newsha), 

591 diff_binary=diff_binary, 

592 diff_algorithm=diff_algorithm, 

593 ) 

594 

595 

596def git_am_patch_split( 

597 f: TextIO | BinaryIO, encoding: str | None = None 

598) -> tuple["Commit", bytes, bytes | None]: 

599 """Parse a git-am-style patch and split it up into bits. 

600 

601 Args: 

602 f: File-like object to parse 

603 encoding: Encoding to use when creating Git objects 

604 Returns: Tuple with commit object, diff contents and git version 

605 """ 

606 encoding = encoding or getattr(f, "encoding", "ascii") 

607 encoding = encoding or "ascii" 

608 contents = f.read() 

609 if isinstance(contents, bytes): 

610 bparser = email.parser.BytesParser() 

611 msg = bparser.parsebytes(contents) 

612 else: 

613 uparser = email.parser.Parser() 

614 msg = uparser.parsestr(contents) 

615 return parse_patch_message(msg, encoding) 

616 

617 

618def parse_patch_message( 

619 msg: email.message.Message, encoding: str | None = None 

620) -> tuple["Commit", bytes, bytes | None]: 

621 """Extract a Commit object and patch from an e-mail message. 

622 

623 Args: 

624 msg: An email message (email.message.Message) 

625 encoding: Encoding to use to encode Git commits 

626 Returns: Tuple with commit object, diff contents and git version 

627 """ 

628 c = Commit() 

629 if encoding is None: 

630 encoding = "ascii" 

631 c.author = msg["from"].encode(encoding) 

632 c.committer = msg["from"].encode(encoding) 

633 try: 

634 patch_tag_start = msg["subject"].index("[PATCH") 

635 except ValueError: 

636 subject = msg["subject"] 

637 else: 

638 close = msg["subject"].index("] ", patch_tag_start) 

639 subject = msg["subject"][close + 2 :] 

640 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

641 first = True 

642 

643 body = msg.get_payload(decode=True) 

644 if isinstance(body, str): 

645 body = body.encode(encoding) 

646 if isinstance(body, bytes): 

647 lines = body.splitlines(True) 

648 else: 

649 # Handle other types by converting to string first 

650 lines = str(body).encode(encoding).splitlines(True) 

651 line_iter = iter(lines) 

652 

653 for line in line_iter: 

654 if line == b"---\n": 

655 break 

656 if first: 

657 if line.startswith(b"From: "): 

658 c.author = line[len(b"From: ") :].rstrip() 

659 else: 

660 c.message += b"\n" + line 

661 first = False 

662 else: 

663 c.message += line 

664 diff = b"" 

665 for line in line_iter: 

666 if line == b"-- \n": 

667 break 

668 diff += line 

669 try: 

670 version = next(line_iter).rstrip(b"\n") 

671 except StopIteration: 

672 version = None 

673 return c, diff, version 

674 

675 

676def patch_id(diff_data: bytes) -> bytes: 

677 """Compute patch ID for a diff. 

678 

679 The patch ID is computed by normalizing the diff and computing a SHA1 hash. 

680 This follows git's patch-id algorithm which: 

681 1. Removes whitespace from lines starting with + or - 

682 2. Replaces line numbers in @@ headers with a canonical form 

683 3. Computes SHA1 of the result 

684 

685 Args: 

686 diff_data: Raw diff data as bytes 

687 

688 Returns: 

689 SHA1 hash of normalized diff (40-byte hex string) 

690 

691 TODO: This implementation uses a simple line-by-line approach. For better 

692 compatibility with git's patch-id, consider using proper patch parsing that: 

693 - Handles edge cases in diff format (binary diffs, mode changes, etc.) 

694 - Properly parses unified diff format according to the spec 

695 - Matches git's exact normalization algorithm byte-for-byte 

696 See git's patch-id.c for reference implementation. 

697 """ 

698 import hashlib 

699 import re 

700 

701 # Normalize the diff for patch-id computation 

702 normalized_lines = [] 

703 

704 for line in diff_data.split(b"\n"): 

705 # Skip diff headers (diff --git, index, ---, +++) 

706 if line.startswith( 

707 ( 

708 b"diff --git ", 

709 b"index ", 

710 b"--- ", 

711 b"+++ ", 

712 b"new file mode ", 

713 b"old file mode ", 

714 b"deleted file mode ", 

715 b"new mode ", 

716 b"old mode ", 

717 b"similarity index ", 

718 b"dissimilarity index ", 

719 b"rename from ", 

720 b"rename to ", 

721 b"copy from ", 

722 b"copy to ", 

723 ) 

724 ): 

725 continue 

726 

727 # Normalize @@ headers to a canonical form 

728 if line.startswith(b"@@"): 

729 # Replace line numbers with canonical form 

730 match = re.match(rb"^@@\s+-\d+(?:,\d+)?\s+\+\d+(?:,\d+)?\s+@@", line) 

731 if match: 

732 # Use canonical hunk header without line numbers 

733 normalized_lines.append(b"@@") 

734 continue 

735 

736 # For +/- lines, strip all whitespace 

737 if line.startswith((b"+", b"-")): 

738 # Keep the +/- prefix but remove all whitespace from the rest 

739 if len(line) > 1: 

740 # Remove all whitespace from the content 

741 content = line[1:].replace(b" ", b"").replace(b"\t", b"") 

742 normalized_lines.append(line[:1] + content) 

743 else: 

744 # Just +/- alone 

745 normalized_lines.append(line[:1]) 

746 continue 

747 

748 # Keep context lines and other content as-is 

749 if line.startswith(b" ") or line == b"": 

750 normalized_lines.append(line) 

751 

752 # Join normalized lines and compute SHA1 

753 normalized = b"\n".join(normalized_lines) 

754 return hashlib.sha1(normalized).hexdigest().encode("ascii") 

755 

756 

757def commit_patch_id( 

758 store: "BaseObjectStore", commit_id: ObjectID | RawObjectID 

759) -> bytes: 

760 """Compute patch ID for a commit. 

761 

762 Args: 

763 store: Object store to read objects from 

764 commit_id: Commit ID (40-byte hex string) 

765 

766 Returns: 

767 Patch ID (40-byte hex string) 

768 """ 

769 from io import BytesIO 

770 

771 commit = store[commit_id] 

772 assert isinstance(commit, Commit) 

773 

774 # Get the parent tree (or empty tree for root commit) 

775 if commit.parents: 

776 parent = store[commit.parents[0]] 

777 assert isinstance(parent, Commit) 

778 parent_tree = parent.tree 

779 else: 

780 # Root commit - compare against empty tree 

781 parent_tree = None 

782 

783 # Generate diff 

784 diff_output = BytesIO() 

785 write_tree_diff(diff_output, store, parent_tree, commit.tree) 

786 

787 return patch_id(diff_output.getvalue()) 

788 

789 

790@dataclass 

791class MailinfoResult: 

792 """Result of mailinfo parsing. 

793 

794 Attributes: 

795 author_name: Author's name 

796 author_email: Author's email address 

797 author_date: Author's date (if present in the email) 

798 subject: Processed subject line 

799 message: Commit message body 

800 patch: Patch content 

801 message_id: Message-ID header (if -m/--message-id was used) 

802 """ 

803 

804 author_name: str 

805 author_email: str 

806 author_date: str | None 

807 subject: str 

808 message: str 

809 patch: str 

810 message_id: str | None = None 

811 

812 

813def _munge_subject(subject: str, keep_subject: bool, keep_non_patch: bool) -> str: 

814 """Munge email subject line for commit message. 

815 

816 Args: 

817 subject: Original subject line 

818 keep_subject: If True, keep subject intact (-k option) 

819 keep_non_patch: If True, only strip [PATCH] (-b option) 

820 

821 Returns: 

822 Processed subject line 

823 """ 

824 if keep_subject: 

825 return subject 

826 

827 result = subject 

828 

829 # First remove Re: prefixes (they can appear before brackets) 

830 while True: 

831 new_result = re.sub(r"^\s*(?:re|RE|Re):\s*", "", result, flags=re.IGNORECASE) 

832 if new_result == result: 

833 break 

834 result = new_result 

835 

836 # Remove bracketed strings 

837 if keep_non_patch: 

838 # Only remove brackets containing "PATCH" 

839 # Match each bracket individually anywhere in the string 

840 while True: 

841 # Remove PATCH bracket, but be careful with whitespace 

842 new_result = re.sub( 

843 r"\[[^\]]*?PATCH[^\]]*?\](\s+)?", r"\1", result, flags=re.IGNORECASE 

844 ) 

845 if new_result == result: 

846 break 

847 result = new_result 

848 else: 

849 # Remove all bracketed strings 

850 while True: 

851 new_result = re.sub(r"^\s*\[.*?\]\s*", "", result) 

852 if new_result == result: 

853 break 

854 result = new_result 

855 

856 # Remove leading/trailing whitespace 

857 result = result.strip() 

858 

859 # Normalize multiple whitespace to single space 

860 result = re.sub(r"\s+", " ", result) 

861 

862 return result 

863 

864 

865def _find_scissors_line(lines: list[bytes]) -> int | None: 

866 """Find the scissors line in message body. 

867 

868 Args: 

869 lines: List of lines in the message body 

870 

871 Returns: 

872 Index of scissors line, or None if not found 

873 """ 

874 scissors_pattern = re.compile( 

875 rb"^(?:>?\s*-+\s*)?(?:8<|>8)?\s*-+\s*$|^(?:>?\s*-+\s*)(?:cut here|scissors)(?:\s*-+)?$", 

876 re.IGNORECASE, 

877 ) 

878 

879 for i, line in enumerate(lines): 

880 if scissors_pattern.match(line.strip()): 

881 return i 

882 

883 return None 

884 

885 

886def mailinfo( 

887 msg: email.message.Message | BinaryIO | TextIO, 

888 keep_subject: bool = False, 

889 keep_non_patch: bool = False, 

890 encoding: str | None = None, 

891 scissors: bool = False, 

892 message_id: bool = False, 

893) -> MailinfoResult: 

894 """Extract patch information from an email message. 

895 

896 This function parses an email message and extracts commit metadata 

897 (author, email, subject) and separates the commit message from the 

898 patch content, similar to git mailinfo. 

899 

900 Args: 

901 msg: Email message (email.message.Message object) or file handle to read from 

902 keep_subject: If True, keep subject intact without munging (-k) 

903 keep_non_patch: If True, only strip [PATCH] from brackets (-b) 

904 encoding: Character encoding to use (default: detect from message) 

905 scissors: If True, remove everything before scissors line 

906 message_id: If True, include Message-ID in commit message (-m) 

907 

908 Returns: 

909 MailinfoResult with parsed information 

910 

911 Raises: 

912 ValueError: If message is malformed or missing required fields 

913 """ 

914 # Parse message if given a file handle 

915 parsed_msg: email.message.Message 

916 if not isinstance(msg, email.message.Message): 

917 if hasattr(msg, "read"): 

918 content = msg.read() 

919 if isinstance(content, bytes): 

920 bparser = email.parser.BytesParser() 

921 parsed_msg = bparser.parsebytes(content) 

922 else: 

923 sparser = email.parser.Parser() 

924 parsed_msg = sparser.parsestr(content) 

925 else: 

926 raise ValueError("msg must be an email.message.Message or file-like object") 

927 else: 

928 parsed_msg = msg 

929 

930 # Detect encoding from message if not specified 

931 if encoding is None: 

932 encoding = parsed_msg.get_content_charset() or "utf-8" 

933 

934 # Extract author information 

935 from_header = parsed_msg.get("From", "") 

936 if not from_header: 

937 raise ValueError("Email message missing 'From' header") 

938 

939 # Parse "Name <email>" format 

940 author_name, author_email = email.utils.parseaddr(from_header) 

941 if not author_email: 

942 raise ValueError( 

943 f"Could not parse email address from 'From' header: {from_header}" 

944 ) 

945 

946 # Extract date 

947 date_header = parsed_msg.get("Date") 

948 author_date = date_header if date_header else None 

949 

950 # Extract and process subject 

951 subject = parsed_msg.get("Subject", "") 

952 if not subject: 

953 subject = "(no subject)" 

954 

955 # Convert Header object to string if needed 

956 subject = str(subject) 

957 

958 # Remove newlines from subject 

959 subject = subject.replace("\n", " ").replace("\r", " ") 

960 subject = _munge_subject(subject, keep_subject, keep_non_patch) 

961 

962 # Extract Message-ID if requested 

963 msg_id = None 

964 if message_id: 

965 msg_id = parsed_msg.get("Message-ID") 

966 

967 # Get message body 

968 body = parsed_msg.get_payload(decode=True) 

969 if body is None: 

970 body = b"" 

971 elif isinstance(body, str): 

972 body = body.encode(encoding) 

973 elif not isinstance(body, bytes): 

974 # Handle multipart or other types 

975 body = str(body).encode(encoding) 

976 

977 # Split into lines 

978 lines = body.splitlines(keepends=True) 

979 

980 # Handle scissors 

981 scissors_idx = None 

982 if scissors: 

983 scissors_idx = _find_scissors_line(lines) 

984 if scissors_idx is not None: 

985 # Remove everything up to and including scissors line 

986 lines = lines[scissors_idx + 1 :] 

987 

988 # Separate commit message from patch 

989 # Look for the "---" separator that indicates start of diffstat/patch 

990 message_lines: list[bytes] = [] 

991 patch_lines: list[bytes] = [] 

992 in_patch = False 

993 

994 for line in lines: 

995 if not in_patch and line == b"---\n": 

996 in_patch = True 

997 patch_lines.append(line) 

998 elif in_patch: 

999 # Stop at signature marker "-- " 

1000 if line == b"-- \n": 

1001 break 

1002 patch_lines.append(line) 

1003 else: 

1004 message_lines.append(line) 

1005 

1006 # Build commit message 

1007 commit_message = b"".join(message_lines).decode(encoding, errors="replace") 

1008 

1009 # Clean up commit message 

1010 commit_message = commit_message.strip() 

1011 

1012 # Append Message-ID if requested 

1013 if message_id and msg_id: 

1014 if commit_message: 

1015 commit_message += "\n\n" 

1016 commit_message += f"Message-ID: {msg_id}" 

1017 

1018 # Build patch content 

1019 patch_content = b"".join(patch_lines).decode(encoding, errors="replace") 

1020 

1021 return MailinfoResult( 

1022 author_name=author_name, 

1023 author_email=author_email, 

1024 author_date=author_date, 

1025 subject=subject, 

1026 message=commit_message, 

1027 patch=patch_content, 

1028 message_id=msg_id, 

1029 )