Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

389 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28import email.message 

29import email.parser 

30import email.utils 

31import re 

32import time 

33from collections.abc import Generator, Sequence 

34from dataclasses import dataclass 

35from difflib import SequenceMatcher 

36from typing import ( 

37 IO, 

38 TYPE_CHECKING, 

39 BinaryIO, 

40 Optional, 

41 TextIO, 

42) 

43 

44if TYPE_CHECKING: 

45 from .object_store import BaseObjectStore 

46 

47from .objects import S_ISGITLINK, Blob, Commit 

48 

49FIRST_FEW_BYTES = 8000 

50 

51DEFAULT_DIFF_ALGORITHM = "myers" 

52 

53 

54class DiffAlgorithmNotAvailable(Exception): 

55 """Raised when a requested diff algorithm is not available.""" 

56 

57 def __init__(self, algorithm: str, install_hint: str = "") -> None: 

58 """Initialize exception. 

59 

60 Args: 

61 algorithm: Name of the unavailable algorithm 

62 install_hint: Optional installation hint 

63 """ 

64 self.algorithm = algorithm 

65 self.install_hint = install_hint 

66 if install_hint: 

67 super().__init__( 

68 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}" 

69 ) 

70 else: 

71 super().__init__( 

72 f"Diff algorithm '{algorithm}' requested but not available." 

73 ) 

74 

75 

76def write_commit_patch( 

77 f: IO[bytes], 

78 commit: "Commit", 

79 contents: str | bytes, 

80 progress: tuple[int, int], 

81 version: str | None = None, 

82 encoding: str | None = None, 

83) -> None: 

84 """Write a individual file patch. 

85 

86 Args: 

87 f: File-like object to write to 

88 commit: Commit object 

89 contents: Contents of the patch 

90 progress: tuple with current patch number and total. 

91 version: Version string to include in patch header 

92 encoding: Encoding to use for the patch 

93 

94 Returns: 

95 tuple with filename and contents 

96 """ 

97 encoding = encoding or getattr(f, "encoding", "ascii") 

98 if encoding is None: 

99 encoding = "ascii" 

100 if isinstance(contents, str): 

101 contents = contents.encode(encoding) 

102 (num, total) = progress 

103 f.write( 

104 b"From " 

105 + commit.id 

106 + b" " 

107 + time.ctime(commit.commit_time).encode(encoding) 

108 + b"\n" 

109 ) 

110 f.write(b"From: " + commit.author + b"\n") 

111 f.write( 

112 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

113 ) 

114 f.write( 

115 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

116 ) 

117 f.write(b"\n") 

118 f.write(b"---\n") 

119 try: 

120 import subprocess 

121 

122 p = subprocess.Popen( 

123 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

124 ) 

125 except (ImportError, OSError): 

126 pass # diffstat not available? 

127 else: 

128 (diffstat, _) = p.communicate(contents) 

129 f.write(diffstat) 

130 f.write(b"\n") 

131 f.write(contents) 

132 f.write(b"-- \n") 

133 if version is None: 

134 from dulwich import __version__ as dulwich_version 

135 

136 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

137 else: 

138 if encoding is None: 

139 encoding = "ascii" 

140 f.write(version.encode(encoding) + b"\n") 

141 

142 

143def get_summary(commit: "Commit") -> str: 

144 """Determine the summary line for use in a filename. 

145 

146 Args: 

147 commit: Commit 

148 Returns: Summary string 

149 """ 

150 decoded = commit.message.decode(errors="replace") 

151 lines = decoded.splitlines() 

152 return lines[0].replace(" ", "-") if lines else "" 

153 

154 

155# Unified Diff 

156def _format_range_unified(start: int, stop: int) -> str: 

157 """Convert range to the "ed" format.""" 

158 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

159 beginning = start + 1 # lines start numbering with one 

160 length = stop - start 

161 if length == 1: 

162 return f"{beginning}" 

163 if not length: 

164 beginning -= 1 # empty ranges begin at line just before the range 

165 return f"{beginning},{length}" 

166 

167 

168def unified_diff( 

169 a: Sequence[bytes], 

170 b: Sequence[bytes], 

171 fromfile: bytes = b"", 

172 tofile: bytes = b"", 

173 fromfiledate: str = "", 

174 tofiledate: str = "", 

175 n: int = 3, 

176 lineterm: str = "\n", 

177 tree_encoding: str = "utf-8", 

178 output_encoding: str = "utf-8", 

179) -> Generator[bytes, None, None]: 

180 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does. 

181 

182 Based on the same function in Python2.7 difflib.py 

183 """ 

184 started = False 

185 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

186 if not started: 

187 started = True 

188 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

189 todate = f"\t{tofiledate}" if tofiledate else "" 

190 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

191 output_encoding 

192 ) 

193 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

194 output_encoding 

195 ) 

196 

197 first, last = group[0], group[-1] 

198 file1_range = _format_range_unified(first[1], last[2]) 

199 file2_range = _format_range_unified(first[3], last[4]) 

200 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

201 

202 for tag, i1, i2, j1, j2 in group: 

203 if tag == "equal": 

204 for line in a[i1:i2]: 

205 yield b" " + line 

206 continue 

207 if tag in ("replace", "delete"): 

208 for line in a[i1:i2]: 

209 if not line[-1:] == b"\n": 

210 line += b"\n\\ No newline at end of file\n" 

211 yield b"-" + line 

212 if tag in ("replace", "insert"): 

213 for line in b[j1:j2]: 

214 if not line[-1:] == b"\n": 

215 line += b"\n\\ No newline at end of file\n" 

216 yield b"+" + line 

217 

218 

219def _get_sequence_matcher( 

220 algorithm: str, a: Sequence[bytes], b: Sequence[bytes] 

221) -> SequenceMatcher[bytes]: 

222 """Get appropriate sequence matcher for the given algorithm. 

223 

224 Args: 

225 algorithm: Diff algorithm ("myers" or "patience") 

226 a: First sequence 

227 b: Second sequence 

228 

229 Returns: 

230 Configured sequence matcher instance 

231 

232 Raises: 

233 DiffAlgorithmNotAvailable: If patience requested but not available 

234 """ 

235 if algorithm == "patience": 

236 try: 

237 from patiencediff import PatienceSequenceMatcher 

238 

239 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore] 

240 except ImportError: 

241 raise DiffAlgorithmNotAvailable( 

242 "patience", "Install with: pip install 'dulwich[patiencediff]'" 

243 ) 

244 else: 

245 return SequenceMatcher(a=a, b=b) 

246 

247 

248def unified_diff_with_algorithm( 

249 a: Sequence[bytes], 

250 b: Sequence[bytes], 

251 fromfile: bytes = b"", 

252 tofile: bytes = b"", 

253 fromfiledate: str = "", 

254 tofiledate: str = "", 

255 n: int = 3, 

256 lineterm: str = "\n", 

257 tree_encoding: str = "utf-8", 

258 output_encoding: str = "utf-8", 

259 algorithm: str | None = None, 

260) -> Generator[bytes, None, None]: 

261 """Generate unified diff with specified algorithm. 

262 

263 Args: 

264 a: First sequence of lines 

265 b: Second sequence of lines 

266 fromfile: Name of first file 

267 tofile: Name of second file 

268 fromfiledate: Date of first file 

269 tofiledate: Date of second file 

270 n: Number of context lines 

271 lineterm: Line terminator 

272 tree_encoding: Encoding for tree paths 

273 output_encoding: Encoding for output 

274 algorithm: Diff algorithm to use ("myers" or "patience") 

275 

276 Returns: 

277 Generator yielding diff lines 

278 

279 Raises: 

280 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available 

281 """ 

282 if algorithm is None: 

283 algorithm = DEFAULT_DIFF_ALGORITHM 

284 

285 matcher = _get_sequence_matcher(algorithm, a, b) 

286 

287 started = False 

288 for group in matcher.get_grouped_opcodes(n): 

289 if not started: 

290 started = True 

291 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

292 todate = f"\t{tofiledate}" if tofiledate else "" 

293 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

294 output_encoding 

295 ) 

296 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

297 output_encoding 

298 ) 

299 

300 first, last = group[0], group[-1] 

301 file1_range = _format_range_unified(first[1], last[2]) 

302 file2_range = _format_range_unified(first[3], last[4]) 

303 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

304 

305 for tag, i1, i2, j1, j2 in group: 

306 if tag == "equal": 

307 for line in a[i1:i2]: 

308 yield b" " + line 

309 continue 

310 if tag in ("replace", "delete"): 

311 for line in a[i1:i2]: 

312 if not line[-1:] == b"\n": 

313 line += b"\n\\ No newline at end of file\n" 

314 yield b"-" + line 

315 if tag in ("replace", "insert"): 

316 for line in b[j1:j2]: 

317 if not line[-1:] == b"\n": 

318 line += b"\n\\ No newline at end of file\n" 

319 yield b"+" + line 

320 

321 

322def is_binary(content: bytes) -> bool: 

323 """See if the first few bytes contain any null characters. 

324 

325 Args: 

326 content: Bytestring to check for binary content 

327 """ 

328 return b"\0" in content[:FIRST_FEW_BYTES] 

329 

330 

331def shortid(hexsha: bytes | None) -> bytes: 

332 """Get short object ID. 

333 

334 Args: 

335 hexsha: Full hex SHA or None 

336 

337 Returns: 

338 7-character short ID 

339 """ 

340 if hexsha is None: 

341 return b"0" * 7 

342 else: 

343 return hexsha[:7] 

344 

345 

346def patch_filename(p: bytes | None, root: bytes) -> bytes: 

347 """Generate patch filename. 

348 

349 Args: 

350 p: Path or None 

351 root: Root directory 

352 

353 Returns: 

354 Full patch filename 

355 """ 

356 if p is None: 

357 return b"/dev/null" 

358 else: 

359 return root + b"/" + p 

360 

361 

362def write_object_diff( 

363 f: IO[bytes], 

364 store: "BaseObjectStore", 

365 old_file: tuple[bytes | None, int | None, bytes | None], 

366 new_file: tuple[bytes | None, int | None, bytes | None], 

367 diff_binary: bool = False, 

368 diff_algorithm: str | None = None, 

369) -> None: 

370 """Write the diff for an object. 

371 

372 Args: 

373 f: File-like object to write to 

374 store: Store to retrieve objects from, if necessary 

375 old_file: (path, mode, hexsha) tuple 

376 new_file: (path, mode, hexsha) tuple 

377 diff_binary: Whether to diff files even if they 

378 are considered binary files by is_binary(). 

379 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

380 

381 Note: the tuple elements should be None for nonexistent files 

382 """ 

383 (old_path, old_mode, old_id) = old_file 

384 (new_path, new_mode, new_id) = new_file 

385 patched_old_path = patch_filename(old_path, b"a") 

386 patched_new_path = patch_filename(new_path, b"b") 

387 

388 def content(mode: int | None, hexsha: bytes | None) -> Blob: 

389 """Get blob content for a file. 

390 

391 Args: 

392 mode: File mode 

393 hexsha: Object SHA 

394 

395 Returns: 

396 Blob object 

397 """ 

398 if hexsha is None: 

399 return Blob.from_string(b"") 

400 elif mode is not None and S_ISGITLINK(mode): 

401 return Blob.from_string(b"Subproject commit " + hexsha + b"\n") 

402 else: 

403 obj = store[hexsha] 

404 if isinstance(obj, Blob): 

405 return obj 

406 else: 

407 # Fallback for non-blob objects 

408 return Blob.from_string(obj.as_raw_string()) 

409 

410 def lines(content: "Blob") -> list[bytes]: 

411 """Split blob content into lines. 

412 

413 Args: 

414 content: Blob content 

415 

416 Returns: 

417 List of lines 

418 """ 

419 if not content: 

420 return [] 

421 else: 

422 return content.splitlines() 

423 

424 f.writelines( 

425 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

426 ) 

427 old_content = content(old_mode, old_id) 

428 new_content = content(new_mode, new_id) 

429 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

430 binary_diff = ( 

431 b"Binary files " 

432 + patched_old_path 

433 + b" and " 

434 + patched_new_path 

435 + b" differ\n" 

436 ) 

437 f.write(binary_diff) 

438 else: 

439 f.writelines( 

440 unified_diff_with_algorithm( 

441 lines(old_content), 

442 lines(new_content), 

443 patched_old_path, 

444 patched_new_path, 

445 algorithm=diff_algorithm, 

446 ) 

447 ) 

448 

449 

450# TODO(jelmer): Support writing unicode, rather than bytes. 

451def gen_diff_header( 

452 paths: tuple[bytes | None, bytes | None], 

453 modes: tuple[int | None, int | None], 

454 shas: tuple[bytes | None, bytes | None], 

455) -> Generator[bytes, None, None]: 

456 """Write a blob diff header. 

457 

458 Args: 

459 paths: Tuple with old and new path 

460 modes: Tuple with old and new modes 

461 shas: Tuple with old and new shas 

462 """ 

463 (old_path, new_path) = paths 

464 (old_mode, new_mode) = modes 

465 (old_sha, new_sha) = shas 

466 if old_path is None and new_path is not None: 

467 old_path = new_path 

468 if new_path is None and old_path is not None: 

469 new_path = old_path 

470 old_path = patch_filename(old_path, b"a") 

471 new_path = patch_filename(new_path, b"b") 

472 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

473 

474 if old_mode != new_mode: 

475 if new_mode is not None: 

476 if old_mode is not None: 

477 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

478 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

479 else: 

480 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

481 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

482 if new_mode is not None and old_mode is not None: 

483 yield (f" {new_mode:o}").encode("ascii") 

484 yield b"\n" 

485 

486 

487# TODO(jelmer): Support writing unicode, rather than bytes. 

488def write_blob_diff( 

489 f: IO[bytes], 

490 old_file: tuple[bytes | None, int | None, Optional["Blob"]], 

491 new_file: tuple[bytes | None, int | None, Optional["Blob"]], 

492 diff_algorithm: str | None = None, 

493) -> None: 

494 """Write blob diff. 

495 

496 Args: 

497 f: File-like object to write to 

498 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

499 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

500 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

501 

502 Note: The use of write_object_diff is recommended over this function. 

503 """ 

504 (old_path, old_mode, old_blob) = old_file 

505 (new_path, new_mode, new_blob) = new_file 

506 patched_old_path = patch_filename(old_path, b"a") 

507 patched_new_path = patch_filename(new_path, b"b") 

508 

509 def lines(blob: Optional["Blob"]) -> list[bytes]: 

510 """Split blob content into lines. 

511 

512 Args: 

513 blob: Blob object or None 

514 

515 Returns: 

516 List of lines 

517 """ 

518 if blob is not None: 

519 return blob.splitlines() 

520 else: 

521 return [] 

522 

523 f.writelines( 

524 gen_diff_header( 

525 (old_path, new_path), 

526 (old_mode, new_mode), 

527 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

528 ) 

529 ) 

530 old_contents = lines(old_blob) 

531 new_contents = lines(new_blob) 

532 f.writelines( 

533 unified_diff_with_algorithm( 

534 old_contents, 

535 new_contents, 

536 patched_old_path, 

537 patched_new_path, 

538 algorithm=diff_algorithm, 

539 ) 

540 ) 

541 

542 

543def write_tree_diff( 

544 f: IO[bytes], 

545 store: "BaseObjectStore", 

546 old_tree: bytes | None, 

547 new_tree: bytes | None, 

548 diff_binary: bool = False, 

549 diff_algorithm: str | None = None, 

550) -> None: 

551 """Write tree diff. 

552 

553 Args: 

554 f: File-like object to write to. 

555 store: Object store to read from 

556 old_tree: Old tree id 

557 new_tree: New tree id 

558 diff_binary: Whether to diff files even if they 

559 are considered binary files by is_binary(). 

560 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

561 """ 

562 changes = store.tree_changes(old_tree, new_tree) 

563 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

564 write_object_diff( 

565 f, 

566 store, 

567 (oldpath, oldmode, oldsha), 

568 (newpath, newmode, newsha), 

569 diff_binary=diff_binary, 

570 diff_algorithm=diff_algorithm, 

571 ) 

572 

573 

574def git_am_patch_split( 

575 f: TextIO | BinaryIO, encoding: str | None = None 

576) -> tuple["Commit", bytes, bytes | None]: 

577 """Parse a git-am-style patch and split it up into bits. 

578 

579 Args: 

580 f: File-like object to parse 

581 encoding: Encoding to use when creating Git objects 

582 Returns: Tuple with commit object, diff contents and git version 

583 """ 

584 encoding = encoding or getattr(f, "encoding", "ascii") 

585 encoding = encoding or "ascii" 

586 contents = f.read() 

587 if isinstance(contents, bytes): 

588 bparser = email.parser.BytesParser() 

589 msg = bparser.parsebytes(contents) 

590 else: 

591 uparser = email.parser.Parser() 

592 msg = uparser.parsestr(contents) 

593 return parse_patch_message(msg, encoding) 

594 

595 

596def parse_patch_message( 

597 msg: email.message.Message, encoding: str | None = None 

598) -> tuple["Commit", bytes, bytes | None]: 

599 """Extract a Commit object and patch from an e-mail message. 

600 

601 Args: 

602 msg: An email message (email.message.Message) 

603 encoding: Encoding to use to encode Git commits 

604 Returns: Tuple with commit object, diff contents and git version 

605 """ 

606 c = Commit() 

607 if encoding is None: 

608 encoding = "ascii" 

609 c.author = msg["from"].encode(encoding) 

610 c.committer = msg["from"].encode(encoding) 

611 try: 

612 patch_tag_start = msg["subject"].index("[PATCH") 

613 except ValueError: 

614 subject = msg["subject"] 

615 else: 

616 close = msg["subject"].index("] ", patch_tag_start) 

617 subject = msg["subject"][close + 2 :] 

618 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

619 first = True 

620 

621 body = msg.get_payload(decode=True) 

622 if isinstance(body, str): 

623 body = body.encode(encoding) 

624 if isinstance(body, bytes): 

625 lines = body.splitlines(True) 

626 else: 

627 # Handle other types by converting to string first 

628 lines = str(body).encode(encoding).splitlines(True) 

629 line_iter = iter(lines) 

630 

631 for line in line_iter: 

632 if line == b"---\n": 

633 break 

634 if first: 

635 if line.startswith(b"From: "): 

636 c.author = line[len(b"From: ") :].rstrip() 

637 else: 

638 c.message += b"\n" + line 

639 first = False 

640 else: 

641 c.message += line 

642 diff = b"" 

643 for line in line_iter: 

644 if line == b"-- \n": 

645 break 

646 diff += line 

647 try: 

648 version = next(line_iter).rstrip(b"\n") 

649 except StopIteration: 

650 version = None 

651 return c, diff, version 

652 

653 

654def patch_id(diff_data: bytes) -> bytes: 

655 """Compute patch ID for a diff. 

656 

657 The patch ID is computed by normalizing the diff and computing a SHA1 hash. 

658 This follows git's patch-id algorithm which: 

659 1. Removes whitespace from lines starting with + or - 

660 2. Replaces line numbers in @@ headers with a canonical form 

661 3. Computes SHA1 of the result 

662 

663 Args: 

664 diff_data: Raw diff data as bytes 

665 

666 Returns: 

667 SHA1 hash of normalized diff (40-byte hex string) 

668 

669 TODO: This implementation uses a simple line-by-line approach. For better 

670 compatibility with git's patch-id, consider using proper patch parsing that: 

671 - Handles edge cases in diff format (binary diffs, mode changes, etc.) 

672 - Properly parses unified diff format according to the spec 

673 - Matches git's exact normalization algorithm byte-for-byte 

674 See git's patch-id.c for reference implementation. 

675 """ 

676 import hashlib 

677 import re 

678 

679 # Normalize the diff for patch-id computation 

680 normalized_lines = [] 

681 

682 for line in diff_data.split(b"\n"): 

683 # Skip diff headers (diff --git, index, ---, +++) 

684 if line.startswith( 

685 ( 

686 b"diff --git ", 

687 b"index ", 

688 b"--- ", 

689 b"+++ ", 

690 b"new file mode ", 

691 b"old file mode ", 

692 b"deleted file mode ", 

693 b"new mode ", 

694 b"old mode ", 

695 b"similarity index ", 

696 b"dissimilarity index ", 

697 b"rename from ", 

698 b"rename to ", 

699 b"copy from ", 

700 b"copy to ", 

701 ) 

702 ): 

703 continue 

704 

705 # Normalize @@ headers to a canonical form 

706 if line.startswith(b"@@"): 

707 # Replace line numbers with canonical form 

708 match = re.match(rb"^@@\s+-\d+(?:,\d+)?\s+\+\d+(?:,\d+)?\s+@@", line) 

709 if match: 

710 # Use canonical hunk header without line numbers 

711 normalized_lines.append(b"@@") 

712 continue 

713 

714 # For +/- lines, strip all whitespace 

715 if line.startswith((b"+", b"-")): 

716 # Keep the +/- prefix but remove all whitespace from the rest 

717 if len(line) > 1: 

718 # Remove all whitespace from the content 

719 content = line[1:].replace(b" ", b"").replace(b"\t", b"") 

720 normalized_lines.append(line[:1] + content) 

721 else: 

722 # Just +/- alone 

723 normalized_lines.append(line[:1]) 

724 continue 

725 

726 # Keep context lines and other content as-is 

727 if line.startswith(b" ") or line == b"": 

728 normalized_lines.append(line) 

729 

730 # Join normalized lines and compute SHA1 

731 normalized = b"\n".join(normalized_lines) 

732 return hashlib.sha1(normalized).hexdigest().encode("ascii") 

733 

734 

735def commit_patch_id(store: "BaseObjectStore", commit_id: bytes) -> bytes: 

736 """Compute patch ID for a commit. 

737 

738 Args: 

739 store: Object store to read objects from 

740 commit_id: Commit ID (40-byte hex string) 

741 

742 Returns: 

743 Patch ID (40-byte hex string) 

744 """ 

745 from io import BytesIO 

746 

747 commit = store[commit_id] 

748 assert isinstance(commit, Commit) 

749 

750 # Get the parent tree (or empty tree for root commit) 

751 if commit.parents: 

752 parent = store[commit.parents[0]] 

753 assert isinstance(parent, Commit) 

754 parent_tree = parent.tree 

755 else: 

756 # Root commit - compare against empty tree 

757 parent_tree = None 

758 

759 # Generate diff 

760 diff_output = BytesIO() 

761 write_tree_diff(diff_output, store, parent_tree, commit.tree) 

762 

763 return patch_id(diff_output.getvalue()) 

764 

765 

766@dataclass 

767class MailinfoResult: 

768 """Result of mailinfo parsing. 

769 

770 Attributes: 

771 author_name: Author's name 

772 author_email: Author's email address 

773 author_date: Author's date (if present in the email) 

774 subject: Processed subject line 

775 message: Commit message body 

776 patch: Patch content 

777 message_id: Message-ID header (if -m/--message-id was used) 

778 """ 

779 

780 author_name: str 

781 author_email: str 

782 author_date: str | None 

783 subject: str 

784 message: str 

785 patch: str 

786 message_id: str | None = None 

787 

788 

789def _munge_subject(subject: str, keep_subject: bool, keep_non_patch: bool) -> str: 

790 """Munge email subject line for commit message. 

791 

792 Args: 

793 subject: Original subject line 

794 keep_subject: If True, keep subject intact (-k option) 

795 keep_non_patch: If True, only strip [PATCH] (-b option) 

796 

797 Returns: 

798 Processed subject line 

799 """ 

800 if keep_subject: 

801 return subject 

802 

803 result = subject 

804 

805 # First remove Re: prefixes (they can appear before brackets) 

806 while True: 

807 new_result = re.sub(r"^\s*(?:re|RE|Re):\s*", "", result, flags=re.IGNORECASE) 

808 if new_result == result: 

809 break 

810 result = new_result 

811 

812 # Remove bracketed strings 

813 if keep_non_patch: 

814 # Only remove brackets containing "PATCH" 

815 # Match each bracket individually anywhere in the string 

816 while True: 

817 # Remove PATCH bracket, but be careful with whitespace 

818 new_result = re.sub( 

819 r"\[[^\]]*?PATCH[^\]]*?\](\s+)?", r"\1", result, flags=re.IGNORECASE 

820 ) 

821 if new_result == result: 

822 break 

823 result = new_result 

824 else: 

825 # Remove all bracketed strings 

826 while True: 

827 new_result = re.sub(r"^\s*\[.*?\]\s*", "", result) 

828 if new_result == result: 

829 break 

830 result = new_result 

831 

832 # Remove leading/trailing whitespace 

833 result = result.strip() 

834 

835 # Normalize multiple whitespace to single space 

836 result = re.sub(r"\s+", " ", result) 

837 

838 return result 

839 

840 

841def _find_scissors_line(lines: list[bytes]) -> int | None: 

842 """Find the scissors line in message body. 

843 

844 Args: 

845 lines: List of lines in the message body 

846 

847 Returns: 

848 Index of scissors line, or None if not found 

849 """ 

850 scissors_pattern = re.compile( 

851 rb"^(?:>?\s*-+\s*)?(?:8<|>8)?\s*-+\s*$|^(?:>?\s*-+\s*)(?:cut here|scissors)(?:\s*-+)?$", 

852 re.IGNORECASE, 

853 ) 

854 

855 for i, line in enumerate(lines): 

856 if scissors_pattern.match(line.strip()): 

857 return i 

858 

859 return None 

860 

861 

862def mailinfo( 

863 msg: email.message.Message | BinaryIO | TextIO, 

864 keep_subject: bool = False, 

865 keep_non_patch: bool = False, 

866 encoding: str | None = None, 

867 scissors: bool = False, 

868 message_id: bool = False, 

869) -> MailinfoResult: 

870 """Extract patch information from an email message. 

871 

872 This function parses an email message and extracts commit metadata 

873 (author, email, subject) and separates the commit message from the 

874 patch content, similar to git mailinfo. 

875 

876 Args: 

877 msg: Email message (email.message.Message object) or file handle to read from 

878 keep_subject: If True, keep subject intact without munging (-k) 

879 keep_non_patch: If True, only strip [PATCH] from brackets (-b) 

880 encoding: Character encoding to use (default: detect from message) 

881 scissors: If True, remove everything before scissors line 

882 message_id: If True, include Message-ID in commit message (-m) 

883 

884 Returns: 

885 MailinfoResult with parsed information 

886 

887 Raises: 

888 ValueError: If message is malformed or missing required fields 

889 """ 

890 # Parse message if given a file handle 

891 parsed_msg: email.message.Message 

892 if not isinstance(msg, email.message.Message): 

893 if hasattr(msg, "read"): 

894 content = msg.read() 

895 if isinstance(content, bytes): 

896 bparser = email.parser.BytesParser() 

897 parsed_msg = bparser.parsebytes(content) 

898 else: 

899 sparser = email.parser.Parser() 

900 parsed_msg = sparser.parsestr(content) 

901 else: 

902 raise ValueError("msg must be an email.message.Message or file-like object") 

903 else: 

904 parsed_msg = msg 

905 

906 # Detect encoding from message if not specified 

907 if encoding is None: 

908 encoding = parsed_msg.get_content_charset() or "utf-8" 

909 

910 # Extract author information 

911 from_header = parsed_msg.get("From", "") 

912 if not from_header: 

913 raise ValueError("Email message missing 'From' header") 

914 

915 # Parse "Name <email>" format 

916 author_name, author_email = email.utils.parseaddr(from_header) 

917 if not author_email: 

918 raise ValueError( 

919 f"Could not parse email address from 'From' header: {from_header}" 

920 ) 

921 

922 # Extract date 

923 date_header = parsed_msg.get("Date") 

924 author_date = date_header if date_header else None 

925 

926 # Extract and process subject 

927 subject = parsed_msg.get("Subject", "") 

928 if not subject: 

929 subject = "(no subject)" 

930 

931 # Convert Header object to string if needed 

932 subject = str(subject) 

933 

934 # Remove newlines from subject 

935 subject = subject.replace("\n", " ").replace("\r", " ") 

936 subject = _munge_subject(subject, keep_subject, keep_non_patch) 

937 

938 # Extract Message-ID if requested 

939 msg_id = None 

940 if message_id: 

941 msg_id = parsed_msg.get("Message-ID") 

942 

943 # Get message body 

944 body = parsed_msg.get_payload(decode=True) 

945 if body is None: 

946 body = b"" 

947 elif isinstance(body, str): 

948 body = body.encode(encoding) 

949 elif not isinstance(body, bytes): 

950 # Handle multipart or other types 

951 body = str(body).encode(encoding) 

952 

953 # Split into lines 

954 lines = body.splitlines(keepends=True) 

955 

956 # Handle scissors 

957 scissors_idx = None 

958 if scissors: 

959 scissors_idx = _find_scissors_line(lines) 

960 if scissors_idx is not None: 

961 # Remove everything up to and including scissors line 

962 lines = lines[scissors_idx + 1 :] 

963 

964 # Separate commit message from patch 

965 # Look for the "---" separator that indicates start of diffstat/patch 

966 message_lines: list[bytes] = [] 

967 patch_lines: list[bytes] = [] 

968 in_patch = False 

969 

970 for line in lines: 

971 if not in_patch and line == b"---\n": 

972 in_patch = True 

973 patch_lines.append(line) 

974 elif in_patch: 

975 # Stop at signature marker "-- " 

976 if line == b"-- \n": 

977 break 

978 patch_lines.append(line) 

979 else: 

980 message_lines.append(line) 

981 

982 # Build commit message 

983 commit_message = b"".join(message_lines).decode(encoding, errors="replace") 

984 

985 # Clean up commit message 

986 commit_message = commit_message.strip() 

987 

988 # Append Message-ID if requested 

989 if message_id and msg_id: 

990 if commit_message: 

991 commit_message += "\n\n" 

992 commit_message += f"Message-ID: {msg_id}" 

993 

994 # Build patch content 

995 patch_content = b"".join(patch_lines).decode(encoding, errors="replace") 

996 

997 return MailinfoResult( 

998 author_name=author_name, 

999 author_email=author_email, 

1000 author_date=author_date, 

1001 subject=subject, 

1002 message=commit_message, 

1003 patch=patch_content, 

1004 message_id=msg_id, 

1005 )