Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 9%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

825 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28__all__ = [ 

29 "DEFAULT_DIFF_ALGORITHM", 

30 "FIRST_FEW_BYTES", 

31 "DiffAlgorithmNotAvailable", 

32 "MailinfoResult", 

33 "PatchApplicationFailure", 

34 "apply_patch_hunks", 

35 "apply_patches", 

36 "commit_patch_id", 

37 "gen_diff_header", 

38 "get_summary", 

39 "git_am_patch_split", 

40 "is_binary", 

41 "mailinfo", 

42 "parse_patch_message", 

43 "patch_filename", 

44 "patch_id", 

45 "shortid", 

46 "unified_diff", 

47 "unified_diff_with_algorithm", 

48 "write_blob_diff", 

49 "write_commit_patch", 

50 "write_object_diff", 

51 "write_tree_diff", 

52] 

53 

54import email.message 

55import email.parser 

56import email.utils 

57import os 

58import re 

59import time 

60from collections.abc import Generator, Sequence 

61from dataclasses import dataclass 

62from difflib import SequenceMatcher 

63from typing import ( 

64 IO, 

65 TYPE_CHECKING, 

66 BinaryIO, 

67 TextIO, 

68) 

69 

70if TYPE_CHECKING: 

71 from .config import Config 

72 from .object_store import BaseObjectStore 

73 from .repo import Repo 

74 

75from .objects import S_ISGITLINK, Blob, Commit, ObjectID, RawObjectID 

76 

77FIRST_FEW_BYTES = 8000 

78 

79DEFAULT_DIFF_ALGORITHM = "myers" 

80 

81 

82class PatchApplicationFailure(Exception): 

83 """Raised when a patch does not apply cleanly.""" 

84 

85 

86class DiffAlgorithmNotAvailable(Exception): 

87 """Raised when a requested diff algorithm is not available.""" 

88 

89 def __init__(self, algorithm: str, install_hint: str = "") -> None: 

90 """Initialize exception. 

91 

92 Args: 

93 algorithm: Name of the unavailable algorithm 

94 install_hint: Optional installation hint 

95 """ 

96 self.algorithm = algorithm 

97 self.install_hint = install_hint 

98 if install_hint: 

99 super().__init__( 

100 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}" 

101 ) 

102 else: 

103 super().__init__( 

104 f"Diff algorithm '{algorithm}' requested but not available." 

105 ) 

106 

107 

108def write_commit_patch( 

109 f: IO[bytes], 

110 commit: "Commit", 

111 contents: str | bytes, 

112 progress: tuple[int, int], 

113 version: str | None = None, 

114 encoding: str | None = None, 

115) -> None: 

116 """Write a individual file patch. 

117 

118 Args: 

119 f: File-like object to write to 

120 commit: Commit object 

121 contents: Contents of the patch 

122 progress: tuple with current patch number and total. 

123 version: Version string to include in patch header 

124 encoding: Encoding to use for the patch 

125 

126 Returns: 

127 tuple with filename and contents 

128 """ 

129 encoding = encoding or getattr(f, "encoding", "ascii") 

130 if encoding is None: 

131 encoding = "ascii" 

132 if isinstance(contents, str): 

133 contents = contents.encode(encoding) 

134 (num, total) = progress 

135 f.write( 

136 b"From " 

137 + commit.id 

138 + b" " 

139 + time.ctime(commit.commit_time).encode(encoding) 

140 + b"\n" 

141 ) 

142 f.write(b"From: " + commit.author + b"\n") 

143 f.write( 

144 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

145 ) 

146 f.write( 

147 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

148 ) 

149 f.write(b"\n") 

150 f.write(b"---\n") 

151 try: 

152 import subprocess 

153 

154 p = subprocess.Popen( 

155 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

156 ) 

157 except (ImportError, OSError): 

158 pass # diffstat not available? 

159 else: 

160 (diffstat, _) = p.communicate(contents) 

161 f.write(diffstat) 

162 f.write(b"\n") 

163 f.write(contents) 

164 f.write(b"-- \n") 

165 if version is None: 

166 from dulwich import __version__ as dulwich_version 

167 

168 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

169 else: 

170 if encoding is None: 

171 encoding = "ascii" 

172 f.write(version.encode(encoding) + b"\n") 

173 

174 

175def get_summary(commit: "Commit") -> str: 

176 """Determine the summary line for use in a filename. 

177 

178 Args: 

179 commit: Commit 

180 Returns: Summary string 

181 """ 

182 decoded = commit.message.decode(errors="replace") 

183 lines = decoded.splitlines() 

184 return lines[0].replace(" ", "-") if lines else "" 

185 

186 

187# Unified Diff 

188def _format_range_unified(start: int, stop: int) -> str: 

189 """Convert range to the "ed" format.""" 

190 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

191 beginning = start + 1 # lines start numbering with one 

192 length = stop - start 

193 if length == 1: 

194 return f"{beginning}" 

195 if not length: 

196 beginning -= 1 # empty ranges begin at line just before the range 

197 return f"{beginning},{length}" 

198 

199 

200def unified_diff( 

201 a: Sequence[bytes], 

202 b: Sequence[bytes], 

203 fromfile: bytes = b"", 

204 tofile: bytes = b"", 

205 fromfiledate: str = "", 

206 tofiledate: str = "", 

207 n: int = 3, 

208 lineterm: str = "\n", 

209 tree_encoding: str = "utf-8", 

210 output_encoding: str = "utf-8", 

211) -> Generator[bytes, None, None]: 

212 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does. 

213 

214 Based on the same function in Python2.7 difflib.py 

215 """ 

216 started = False 

217 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

218 if not started: 

219 started = True 

220 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

221 todate = f"\t{tofiledate}" if tofiledate else "" 

222 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

223 output_encoding 

224 ) 

225 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

226 output_encoding 

227 ) 

228 

229 first, last = group[0], group[-1] 

230 file1_range = _format_range_unified(first[1], last[2]) 

231 file2_range = _format_range_unified(first[3], last[4]) 

232 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

233 

234 for tag, i1, i2, j1, j2 in group: 

235 if tag == "equal": 

236 for line in a[i1:i2]: 

237 yield b" " + line 

238 continue 

239 if tag in ("replace", "delete"): 

240 for line in a[i1:i2]: 

241 if not line[-1:] == b"\n": 

242 line += b"\n\\ No newline at end of file\n" 

243 yield b"-" + line 

244 if tag in ("replace", "insert"): 

245 for line in b[j1:j2]: 

246 if not line[-1:] == b"\n": 

247 line += b"\n\\ No newline at end of file\n" 

248 yield b"+" + line 

249 

250 

251def _get_sequence_matcher( 

252 algorithm: str, a: Sequence[bytes], b: Sequence[bytes] 

253) -> SequenceMatcher[bytes]: 

254 """Get appropriate sequence matcher for the given algorithm. 

255 

256 Args: 

257 algorithm: Diff algorithm ("myers" or "patience") 

258 a: First sequence 

259 b: Second sequence 

260 

261 Returns: 

262 Configured sequence matcher instance 

263 

264 Raises: 

265 DiffAlgorithmNotAvailable: If patience requested but not available 

266 """ 

267 if algorithm == "patience": 

268 try: 

269 from patiencediff import PatienceSequenceMatcher 

270 

271 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore] 

272 except ImportError: 

273 raise DiffAlgorithmNotAvailable( 

274 "patience", "Install with: pip install 'dulwich[patiencediff]'" 

275 ) 

276 else: 

277 return SequenceMatcher(a=a, b=b) 

278 

279 

280def unified_diff_with_algorithm( 

281 a: Sequence[bytes], 

282 b: Sequence[bytes], 

283 fromfile: bytes = b"", 

284 tofile: bytes = b"", 

285 fromfiledate: str = "", 

286 tofiledate: str = "", 

287 n: int = 3, 

288 lineterm: str = "\n", 

289 tree_encoding: str = "utf-8", 

290 output_encoding: str = "utf-8", 

291 algorithm: str | None = None, 

292) -> Generator[bytes, None, None]: 

293 """Generate unified diff with specified algorithm. 

294 

295 Args: 

296 a: First sequence of lines 

297 b: Second sequence of lines 

298 fromfile: Name of first file 

299 tofile: Name of second file 

300 fromfiledate: Date of first file 

301 tofiledate: Date of second file 

302 n: Number of context lines 

303 lineterm: Line terminator 

304 tree_encoding: Encoding for tree paths 

305 output_encoding: Encoding for output 

306 algorithm: Diff algorithm to use ("myers" or "patience") 

307 

308 Returns: 

309 Generator yielding diff lines 

310 

311 Raises: 

312 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available 

313 """ 

314 if algorithm is None: 

315 algorithm = DEFAULT_DIFF_ALGORITHM 

316 

317 matcher = _get_sequence_matcher(algorithm, a, b) 

318 

319 started = False 

320 for group in matcher.get_grouped_opcodes(n): 

321 if not started: 

322 started = True 

323 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

324 todate = f"\t{tofiledate}" if tofiledate else "" 

325 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

326 output_encoding 

327 ) 

328 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

329 output_encoding 

330 ) 

331 

332 first, last = group[0], group[-1] 

333 file1_range = _format_range_unified(first[1], last[2]) 

334 file2_range = _format_range_unified(first[3], last[4]) 

335 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

336 

337 for tag, i1, i2, j1, j2 in group: 

338 if tag == "equal": 

339 for line in a[i1:i2]: 

340 yield b" " + line 

341 continue 

342 if tag in ("replace", "delete"): 

343 for line in a[i1:i2]: 

344 if not line[-1:] == b"\n": 

345 line += b"\n\\ No newline at end of file\n" 

346 yield b"-" + line 

347 if tag in ("replace", "insert"): 

348 for line in b[j1:j2]: 

349 if not line[-1:] == b"\n": 

350 line += b"\n\\ No newline at end of file\n" 

351 yield b"+" + line 

352 

353 

354def is_binary(content: bytes) -> bool: 

355 """See if the first few bytes contain any null characters. 

356 

357 Args: 

358 content: Bytestring to check for binary content 

359 """ 

360 return b"\0" in content[:FIRST_FEW_BYTES] 

361 

362 

363def shortid(hexsha: bytes | None) -> bytes: 

364 """Get short object ID. 

365 

366 Args: 

367 hexsha: Full hex SHA or None 

368 

369 Returns: 

370 7-character short ID 

371 """ 

372 if hexsha is None: 

373 return b"0" * 7 

374 else: 

375 return hexsha[:7] 

376 

377 

378def patch_filename(p: bytes | None, root: bytes) -> bytes: 

379 """Generate patch filename. 

380 

381 Args: 

382 p: Path or None 

383 root: Root directory 

384 

385 Returns: 

386 Full patch filename 

387 """ 

388 if p is None: 

389 return b"/dev/null" 

390 else: 

391 return root + b"/" + p 

392 

393 

394def write_object_diff( 

395 f: IO[bytes], 

396 store: "BaseObjectStore", 

397 old_file: tuple[bytes | None, int | None, ObjectID | None], 

398 new_file: tuple[bytes | None, int | None, ObjectID | None], 

399 diff_binary: bool = False, 

400 diff_algorithm: str | None = None, 

401) -> None: 

402 """Write the diff for an object. 

403 

404 Args: 

405 f: File-like object to write to 

406 store: Store to retrieve objects from, if necessary 

407 old_file: (path, mode, hexsha) tuple 

408 new_file: (path, mode, hexsha) tuple 

409 diff_binary: Whether to diff files even if they 

410 are considered binary files by is_binary(). 

411 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

412 

413 Note: the tuple elements should be None for nonexistent files 

414 """ 

415 (old_path, old_mode, old_id) = old_file 

416 (new_path, new_mode, new_id) = new_file 

417 patched_old_path = patch_filename(old_path, b"a") 

418 patched_new_path = patch_filename(new_path, b"b") 

419 

420 def content(mode: int | None, hexsha: ObjectID | None) -> Blob: 

421 """Get blob content for a file. 

422 

423 Args: 

424 mode: File mode 

425 hexsha: Object SHA 

426 

427 Returns: 

428 Blob object 

429 """ 

430 if hexsha is None: 

431 return Blob.from_string(b"") 

432 elif mode is not None and S_ISGITLINK(mode): 

433 return Blob.from_string(b"Subproject commit " + hexsha + b"\n") 

434 else: 

435 obj = store[hexsha] 

436 if isinstance(obj, Blob): 

437 return obj 

438 else: 

439 # Fallback for non-blob objects 

440 return Blob.from_string(obj.as_raw_string()) 

441 

442 def lines(content: "Blob") -> list[bytes]: 

443 """Split blob content into lines. 

444 

445 Args: 

446 content: Blob content 

447 

448 Returns: 

449 List of lines 

450 """ 

451 if not content: 

452 return [] 

453 else: 

454 return content.splitlines() 

455 

456 f.writelines( 

457 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

458 ) 

459 old_content = content(old_mode, old_id) 

460 new_content = content(new_mode, new_id) 

461 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

462 binary_diff = ( 

463 b"Binary files " 

464 + patched_old_path 

465 + b" and " 

466 + patched_new_path 

467 + b" differ\n" 

468 ) 

469 f.write(binary_diff) 

470 else: 

471 f.writelines( 

472 unified_diff_with_algorithm( 

473 lines(old_content), 

474 lines(new_content), 

475 patched_old_path, 

476 patched_new_path, 

477 algorithm=diff_algorithm, 

478 ) 

479 ) 

480 

481 

482# TODO(jelmer): Support writing unicode, rather than bytes. 

483def gen_diff_header( 

484 paths: tuple[bytes | None, bytes | None], 

485 modes: tuple[int | None, int | None], 

486 shas: tuple[bytes | None, bytes | None], 

487) -> Generator[bytes, None, None]: 

488 """Write a blob diff header. 

489 

490 Args: 

491 paths: Tuple with old and new path 

492 modes: Tuple with old and new modes 

493 shas: Tuple with old and new shas 

494 """ 

495 (old_path, new_path) = paths 

496 (old_mode, new_mode) = modes 

497 (old_sha, new_sha) = shas 

498 if old_path is None and new_path is not None: 

499 old_path = new_path 

500 if new_path is None and old_path is not None: 

501 new_path = old_path 

502 old_path = patch_filename(old_path, b"a") 

503 new_path = patch_filename(new_path, b"b") 

504 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

505 

506 if old_mode != new_mode: 

507 if new_mode is not None: 

508 if old_mode is not None: 

509 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

510 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

511 else: 

512 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

513 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

514 if new_mode is not None and old_mode is not None: 

515 yield (f" {new_mode:o}").encode("ascii") 

516 yield b"\n" 

517 

518 

519# TODO(jelmer): Support writing unicode, rather than bytes. 

520def write_blob_diff( 

521 f: IO[bytes], 

522 old_file: tuple[bytes | None, int | None, "Blob | None"], 

523 new_file: tuple[bytes | None, int | None, "Blob | None"], 

524 diff_algorithm: str | None = None, 

525) -> None: 

526 """Write blob diff. 

527 

528 Args: 

529 f: File-like object to write to 

530 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

531 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

532 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

533 

534 Note: The use of write_object_diff is recommended over this function. 

535 """ 

536 (old_path, old_mode, old_blob) = old_file 

537 (new_path, new_mode, new_blob) = new_file 

538 patched_old_path = patch_filename(old_path, b"a") 

539 patched_new_path = patch_filename(new_path, b"b") 

540 

541 def lines(blob: "Blob | None") -> list[bytes]: 

542 """Split blob content into lines. 

543 

544 Args: 

545 blob: Blob object or None 

546 

547 Returns: 

548 List of lines 

549 """ 

550 if blob is not None: 

551 return blob.splitlines() 

552 else: 

553 return [] 

554 

555 f.writelines( 

556 gen_diff_header( 

557 (old_path, new_path), 

558 (old_mode, new_mode), 

559 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

560 ) 

561 ) 

562 old_contents = lines(old_blob) 

563 new_contents = lines(new_blob) 

564 f.writelines( 

565 unified_diff_with_algorithm( 

566 old_contents, 

567 new_contents, 

568 patched_old_path, 

569 patched_new_path, 

570 algorithm=diff_algorithm, 

571 ) 

572 ) 

573 

574 

575def write_tree_diff( 

576 f: IO[bytes], 

577 store: "BaseObjectStore", 

578 old_tree: ObjectID | None, 

579 new_tree: ObjectID | None, 

580 diff_binary: bool = False, 

581 diff_algorithm: str | None = None, 

582) -> None: 

583 """Write tree diff. 

584 

585 Args: 

586 f: File-like object to write to. 

587 store: Object store to read from 

588 old_tree: Old tree id 

589 new_tree: New tree id 

590 diff_binary: Whether to diff files even if they 

591 are considered binary files by is_binary(). 

592 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

593 """ 

594 changes = store.tree_changes(old_tree, new_tree) 

595 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

596 write_object_diff( 

597 f, 

598 store, 

599 (oldpath, oldmode, oldsha), 

600 (newpath, newmode, newsha), 

601 diff_binary=diff_binary, 

602 diff_algorithm=diff_algorithm, 

603 ) 

604 

605 

606def git_am_patch_split( 

607 f: TextIO | BinaryIO, encoding: str | None = None 

608) -> tuple["Commit", bytes, bytes | None]: 

609 """Parse a git-am-style patch and split it up into bits. 

610 

611 Args: 

612 f: File-like object to parse 

613 encoding: Encoding to use when creating Git objects 

614 Returns: Tuple with commit object, diff contents and git version 

615 """ 

616 encoding = encoding or getattr(f, "encoding", "ascii") 

617 encoding = encoding or "ascii" 

618 contents = f.read() 

619 if isinstance(contents, bytes): 

620 bparser = email.parser.BytesParser() 

621 msg = bparser.parsebytes(contents) 

622 else: 

623 uparser = email.parser.Parser() 

624 msg = uparser.parsestr(contents) 

625 return parse_patch_message(msg, encoding) 

626 

627 

628def parse_patch_message( 

629 msg: email.message.Message, encoding: str | None = None 

630) -> tuple["Commit", bytes, bytes | None]: 

631 """Extract a Commit object and patch from an e-mail message. 

632 

633 Args: 

634 msg: An email message (email.message.Message) 

635 encoding: Encoding to use to encode Git commits 

636 Returns: Tuple with commit object, diff contents and git version 

637 """ 

638 c = Commit() 

639 if encoding is None: 

640 encoding = "ascii" 

641 c.author = msg["from"].encode(encoding) 

642 c.committer = msg["from"].encode(encoding) 

643 try: 

644 patch_tag_start = msg["subject"].index("[PATCH") 

645 except ValueError: 

646 subject = msg["subject"] 

647 else: 

648 close = msg["subject"].index("] ", patch_tag_start) 

649 subject = msg["subject"][close + 2 :] 

650 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

651 first = True 

652 

653 body = msg.get_payload(decode=True) 

654 if isinstance(body, str): 

655 body = body.encode(encoding) 

656 if isinstance(body, bytes): 

657 lines = body.splitlines(True) 

658 else: 

659 # Handle other types by converting to string first 

660 lines = str(body).encode(encoding).splitlines(True) 

661 line_iter = iter(lines) 

662 

663 for line in line_iter: 

664 if line == b"---\n": 

665 break 

666 if first: 

667 if line.startswith(b"From: "): 

668 c.author = line[len(b"From: ") :].rstrip() 

669 else: 

670 c.message += b"\n" + line 

671 first = False 

672 else: 

673 c.message += line 

674 diff = b"" 

675 for line in line_iter: 

676 if line == b"-- \n": 

677 break 

678 diff += line 

679 try: 

680 version = next(line_iter).rstrip(b"\n") 

681 except StopIteration: 

682 version = None 

683 return c, diff, version 

684 

685 

686def patch_id(diff_data: bytes) -> bytes: 

687 """Compute patch ID for a diff. 

688 

689 The patch ID is computed by normalizing the diff and computing a SHA1 hash. 

690 This follows git's patch-id algorithm which: 

691 1. Removes whitespace from lines starting with + or - 

692 2. Replaces line numbers in @@ headers with a canonical form 

693 3. Computes SHA1 of the result 

694 

695 Args: 

696 diff_data: Raw diff data as bytes 

697 

698 Returns: 

699 SHA1 hash of normalized diff (40-byte hex string) 

700 

701 TODO: This implementation uses a simple line-by-line approach. For better 

702 compatibility with git's patch-id, consider using proper patch parsing that: 

703 - Handles edge cases in diff format (binary diffs, mode changes, etc.) 

704 - Properly parses unified diff format according to the spec 

705 - Matches git's exact normalization algorithm byte-for-byte 

706 See git's patch-id.c for reference implementation. 

707 """ 

708 import hashlib 

709 import re 

710 

711 # Normalize the diff for patch-id computation 

712 normalized_lines = [] 

713 

714 for line in diff_data.split(b"\n"): 

715 # Skip diff headers (diff --git, index, ---, +++) 

716 if line.startswith( 

717 ( 

718 b"diff --git ", 

719 b"index ", 

720 b"--- ", 

721 b"+++ ", 

722 b"new file mode ", 

723 b"old file mode ", 

724 b"deleted file mode ", 

725 b"new mode ", 

726 b"old mode ", 

727 b"similarity index ", 

728 b"dissimilarity index ", 

729 b"rename from ", 

730 b"rename to ", 

731 b"copy from ", 

732 b"copy to ", 

733 ) 

734 ): 

735 continue 

736 

737 # Normalize @@ headers to a canonical form 

738 if line.startswith(b"@@"): 

739 # Replace line numbers with canonical form 

740 match = re.match(rb"^@@\s+-\d+(?:,\d+)?\s+\+\d+(?:,\d+)?\s+@@", line) 

741 if match: 

742 # Use canonical hunk header without line numbers 

743 normalized_lines.append(b"@@") 

744 continue 

745 

746 # For +/- lines, strip all whitespace 

747 if line.startswith((b"+", b"-")): 

748 # Keep the +/- prefix but remove all whitespace from the rest 

749 if len(line) > 1: 

750 # Remove all whitespace from the content 

751 content = line[1:].replace(b" ", b"").replace(b"\t", b"") 

752 normalized_lines.append(line[:1] + content) 

753 else: 

754 # Just +/- alone 

755 normalized_lines.append(line[:1]) 

756 continue 

757 

758 # Keep context lines and other content as-is 

759 if line.startswith(b" ") or line == b"": 

760 normalized_lines.append(line) 

761 

762 # Join normalized lines and compute SHA1 

763 normalized = b"\n".join(normalized_lines) 

764 return hashlib.sha1(normalized).hexdigest().encode("ascii") 

765 

766 

767def commit_patch_id( 

768 store: "BaseObjectStore", commit_id: ObjectID | RawObjectID 

769) -> bytes: 

770 """Compute patch ID for a commit. 

771 

772 Args: 

773 store: Object store to read objects from 

774 commit_id: Commit ID (40-byte hex string) 

775 

776 Returns: 

777 Patch ID (40-byte hex string) 

778 """ 

779 from io import BytesIO 

780 

781 commit = store[commit_id] 

782 assert isinstance(commit, Commit) 

783 

784 # Get the parent tree (or empty tree for root commit) 

785 if commit.parents: 

786 parent = store[commit.parents[0]] 

787 assert isinstance(parent, Commit) 

788 parent_tree = parent.tree 

789 else: 

790 # Root commit - compare against empty tree 

791 parent_tree = None 

792 

793 # Generate diff 

794 diff_output = BytesIO() 

795 write_tree_diff(diff_output, store, parent_tree, commit.tree) 

796 

797 return patch_id(diff_output.getvalue()) 

798 

799 

800@dataclass 

801class MailinfoResult: 

802 """Result of mailinfo parsing. 

803 

804 Attributes: 

805 author_name: Author's name 

806 author_email: Author's email address 

807 author_date: Author's date (if present in the email) 

808 subject: Processed subject line 

809 message: Commit message body 

810 patch: Patch content 

811 message_id: Message-ID header (if -m/--message-id was used) 

812 """ 

813 

814 author_name: str 

815 author_email: str 

816 author_date: str | None 

817 subject: str 

818 message: str 

819 patch: str 

820 message_id: str | None = None 

821 

822 

823def _munge_subject(subject: str, keep_subject: bool, keep_non_patch: bool) -> str: 

824 """Munge email subject line for commit message. 

825 

826 Args: 

827 subject: Original subject line 

828 keep_subject: If True, keep subject intact (-k option) 

829 keep_non_patch: If True, only strip [PATCH] (-b option) 

830 

831 Returns: 

832 Processed subject line 

833 """ 

834 if keep_subject: 

835 return subject 

836 

837 result = subject 

838 

839 # First remove Re: prefixes (they can appear before brackets) 

840 while True: 

841 new_result = re.sub(r"^\s*(?:re|RE|Re):\s*", "", result, flags=re.IGNORECASE) 

842 if new_result == result: 

843 break 

844 result = new_result 

845 

846 # Remove bracketed strings 

847 if keep_non_patch: 

848 # Only remove brackets containing "PATCH" 

849 # Match each bracket individually anywhere in the string 

850 while True: 

851 # Remove PATCH bracket, but be careful with whitespace 

852 new_result = re.sub( 

853 r"\[[^\]]*?PATCH[^\]]*?\](\s+)?", r"\1", result, flags=re.IGNORECASE 

854 ) 

855 if new_result == result: 

856 break 

857 result = new_result 

858 else: 

859 # Remove all bracketed strings 

860 while True: 

861 new_result = re.sub(r"^\s*\[.*?\]\s*", "", result) 

862 if new_result == result: 

863 break 

864 result = new_result 

865 

866 # Remove leading/trailing whitespace 

867 result = result.strip() 

868 

869 # Normalize multiple whitespace to single space 

870 result = re.sub(r"\s+", " ", result) 

871 

872 return result 

873 

874 

875def _find_scissors_line(lines: list[bytes]) -> int | None: 

876 """Find the scissors line in message body. 

877 

878 Args: 

879 lines: List of lines in the message body 

880 

881 Returns: 

882 Index of scissors line, or None if not found 

883 """ 

884 scissors_pattern = re.compile( 

885 rb"^(?:>?\s*-+\s*)?(?:8<|>8)?\s*-+\s*$|^(?:>?\s*-+\s*)(?:cut here|scissors)(?:\s*-+)?$", 

886 re.IGNORECASE, 

887 ) 

888 

889 for i, line in enumerate(lines): 

890 if scissors_pattern.match(line.strip()): 

891 return i 

892 

893 return None 

894 

895 

896def git_base85_decode(data: bytes) -> bytes: 

897 """Decode Git's base85-encoded binary data. 

898 

899 Git uses a custom base85 encoding with its own alphabet and line format. 

900 Each line starts with a length byte followed by base85-encoded data. 

901 

902 Args: 

903 data: Base85-encoded data as bytes (may contain multiple lines) 

904 

905 Returns: 

906 Decoded binary data 

907 

908 Raises: 

909 ValueError: If the data is invalid 

910 """ 

911 # Git's base85 alphabet (different from RFC 1924) 

912 alphabet = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~" 

913 

914 # Create decode table 

915 decode_table = {} 

916 for i, c in enumerate(alphabet): 

917 decode_table[c] = i 

918 

919 result = bytearray() 

920 lines = data.strip().split(b"\n") 

921 

922 for line in lines: 

923 if not line: 

924 continue 

925 

926 # First character encodes the length of decoded data for this line 

927 if line[0] not in decode_table: 

928 continue 

929 

930 encoded_len = decode_table[line[0]] 

931 if encoded_len == 0: 

932 continue 

933 

934 # Decode the rest of the line 

935 encoded_data = line[1:] 

936 

937 # Process in groups of 5 characters (which encode 4 bytes) 

938 i = 0 

939 decoded_this_line = 0 

940 while i < len(encoded_data) and decoded_this_line < encoded_len: 

941 # Get up to 5 characters 

942 group = encoded_data[i : i + 5] 

943 if len(group) == 0: 

944 break 

945 

946 # Decode 5 base85 digits to a 32-bit value 

947 value = 0 

948 for c in group: 

949 if c not in decode_table: 

950 raise ValueError(f"Invalid base85 character: {chr(c)}") 

951 value = value * 85 + decode_table[c] 

952 

953 # Convert to 4 bytes (big-endian) 

954 bytes_to_add = min(4, encoded_len - decoded_this_line) 

955 decoded_bytes = value.to_bytes(4, byteorder="big") 

956 result.extend(decoded_bytes[:bytes_to_add]) 

957 decoded_this_line += bytes_to_add 

958 i += 5 

959 

960 return bytes(result) 

961 

962 

963@dataclass 

964class PatchHunk: 

965 """Represents a single hunk in a unified diff. 

966 

967 Attributes: 

968 old_start: Starting line number in old file 

969 old_count: Number of lines in old file 

970 new_start: Starting line number in new file 

971 new_count: Number of lines in new file 

972 lines: List of diff lines (prefixed with ' ', '+', or '-') 

973 """ 

974 

975 old_start: int 

976 old_count: int 

977 new_start: int 

978 new_count: int 

979 lines: list[bytes] 

980 

981 

982@dataclass 

983class FilePatch: 

984 """Represents a patch for a single file. 

985 

986 Attributes: 

987 old_path: Path to old file (None for new files) 

988 new_path: Path to new file (None for deleted files) 

989 old_mode: Mode of old file (None for new files) 

990 new_mode: Mode of new file (None for deleted files) 

991 hunks: List of PatchHunk objects 

992 binary: True if this is a binary patch 

993 rename_from: Original path for renames (None if not a rename) 

994 rename_to: New path for renames (None if not a rename) 

995 copy_from: Source path for copies (None if not a copy) 

996 copy_to: Destination path for copies (None if not a copy) 

997 binary_old: Old binary content for binary patches (base85 encoded) 

998 binary_new: New binary content for binary patches (base85 encoded) 

999 """ 

1000 

1001 old_path: bytes | None 

1002 new_path: bytes | None 

1003 old_mode: int | None 

1004 new_mode: int | None 

1005 hunks: list[PatchHunk] 

1006 binary: bool = False 

1007 rename_from: bytes | None = None 

1008 rename_to: bytes | None = None 

1009 copy_from: bytes | None = None 

1010 copy_to: bytes | None = None 

1011 binary_old: bytes | None = None 

1012 binary_new: bytes | None = None 

1013 

1014 

1015def parse_unified_diff(diff_text: bytes) -> list[FilePatch]: 

1016 """Parse a unified diff into FilePatch objects. 

1017 

1018 Args: 

1019 diff_text: Unified diff content as bytes 

1020 

1021 Returns: 

1022 List of FilePatch objects 

1023 """ 

1024 patches: list[FilePatch] = [] 

1025 lines = diff_text.split(b"\n") 

1026 i = 0 

1027 

1028 while i < len(lines): 

1029 line = lines[i] 

1030 

1031 # Look for diff header 

1032 if line.startswith(b"diff --git "): 

1033 # Parse file patch 

1034 old_path = None 

1035 new_path = None 

1036 old_mode = None 

1037 new_mode = None 

1038 hunks: list[PatchHunk] = [] 

1039 binary = False 

1040 rename_from = None 

1041 rename_to = None 

1042 copy_from = None 

1043 copy_to = None 

1044 binary_old = None 

1045 binary_new = None 

1046 

1047 # Parse extended headers 

1048 i += 1 

1049 while i < len(lines): 

1050 line = lines[i] 

1051 

1052 if line.startswith(b"old file mode "): 

1053 old_mode = int(line.split()[-1], 8) 

1054 i += 1 

1055 elif line.startswith(b"new file mode "): 

1056 new_mode = int(line.split()[-1], 8) 

1057 i += 1 

1058 elif line.startswith(b"deleted file mode "): 

1059 old_mode = int(line.split()[-1], 8) 

1060 i += 1 

1061 elif line.startswith(b"new mode "): 

1062 new_mode = int(line.split()[-1], 8) 

1063 i += 1 

1064 elif line.startswith(b"old mode "): 

1065 old_mode = int(line.split()[-1], 8) 

1066 i += 1 

1067 elif line.startswith(b"rename from "): 

1068 rename_from = line[12:].strip() 

1069 i += 1 

1070 elif line.startswith(b"rename to "): 

1071 rename_to = line[10:].strip() 

1072 i += 1 

1073 elif line.startswith(b"copy from "): 

1074 copy_from = line[10:].strip() 

1075 i += 1 

1076 elif line.startswith(b"copy to "): 

1077 copy_to = line[8:].strip() 

1078 i += 1 

1079 elif line.startswith(b"similarity index "): 

1080 # Just skip similarity index for now 

1081 i += 1 

1082 elif line.startswith(b"dissimilarity index "): 

1083 # Just skip dissimilarity index for now 

1084 i += 1 

1085 elif line.startswith(b"index "): 

1086 i += 1 

1087 elif line.startswith(b"--- "): 

1088 # Parse old file path 

1089 path = line[4:].split(b"\t")[0] 

1090 if path != b"/dev/null": 

1091 old_path = path 

1092 i += 1 

1093 elif line.startswith(b"+++ "): 

1094 # Parse new file path 

1095 path = line[4:].split(b"\t")[0] 

1096 if path != b"/dev/null": 

1097 new_path = path 

1098 i += 1 

1099 break 

1100 elif line.startswith(b"Binary files"): 

1101 binary = True 

1102 i += 1 

1103 break 

1104 elif line.startswith(b"GIT binary patch"): 

1105 binary = True 

1106 i += 1 

1107 # Parse binary patch data 

1108 while i < len(lines): 

1109 line = lines[i] 

1110 if line.startswith(b"literal "): 

1111 # New binary data 

1112 # size = int(line[8:].strip()) # Size information, not currently used 

1113 i += 1 

1114 binary_data = b"" 

1115 while i < len(lines): 

1116 line = lines[i] 

1117 if ( 

1118 line.startswith( 

1119 (b"literal ", b"delta ", b"diff --git ") 

1120 ) 

1121 or not line.strip() 

1122 ): 

1123 break 

1124 binary_data += line + b"\n" 

1125 i += 1 

1126 binary_new = binary_data 

1127 elif line.startswith(b"delta "): 

1128 # Delta patch (not supported yet) 

1129 i += 1 

1130 while i < len(lines): 

1131 line = lines[i] 

1132 if ( 

1133 line.startswith( 

1134 (b"literal ", b"delta ", b"diff --git ") 

1135 ) 

1136 or not line.strip() 

1137 ): 

1138 break 

1139 i += 1 

1140 else: 

1141 break 

1142 break 

1143 else: 

1144 i += 1 

1145 break 

1146 

1147 # Parse hunks 

1148 if not binary: 

1149 while i < len(lines): 

1150 line = lines[i] 

1151 

1152 if line.startswith(b"@@ "): 

1153 # Parse hunk header 

1154 match = re.match( 

1155 rb"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line 

1156 ) 

1157 if match: 

1158 old_start = int(match.group(1)) 

1159 old_count = int(match.group(2)) if match.group(2) else 1 

1160 new_start = int(match.group(3)) 

1161 new_count = int(match.group(4)) if match.group(4) else 1 

1162 

1163 # Parse hunk lines 

1164 hunk_lines: list[bytes] = [] 

1165 i += 1 

1166 while i < len(lines): 

1167 line = lines[i] 

1168 if line.startswith((b" ", b"+", b"-", b"\\")): 

1169 hunk_lines.append(line) 

1170 i += 1 

1171 else: 

1172 break 

1173 

1174 hunks.append( 

1175 PatchHunk( 

1176 old_start=old_start, 

1177 old_count=old_count, 

1178 new_start=new_start, 

1179 new_count=new_count, 

1180 lines=hunk_lines, 

1181 ) 

1182 ) 

1183 else: 

1184 i += 1 

1185 elif line.startswith(b"diff --git "): 

1186 # Next file patch 

1187 break 

1188 else: 

1189 i += 1 

1190 if not line.strip(): 

1191 # Empty line, might be end of patch or separator 

1192 break 

1193 

1194 patches.append( 

1195 FilePatch( 

1196 old_path=old_path, 

1197 new_path=new_path, 

1198 old_mode=old_mode, 

1199 new_mode=new_mode, 

1200 hunks=hunks, 

1201 binary=binary, 

1202 rename_from=rename_from, 

1203 rename_to=rename_to, 

1204 copy_from=copy_from, 

1205 copy_to=copy_to, 

1206 binary_old=binary_old, 

1207 binary_new=binary_new, 

1208 ) 

1209 ) 

1210 else: 

1211 i += 1 

1212 

1213 return patches 

1214 

1215 

1216def apply_patch_hunks( 

1217 patch: FilePatch, 

1218 original_lines: list[bytes], 

1219) -> list[bytes] | None: 

1220 """Apply patch hunks to file content. 

1221 

1222 Args: 

1223 patch: FilePatch object to apply 

1224 original_lines: Original file content as list of lines 

1225 

1226 Returns: 

1227 Patched file content as list of lines, or None if patch cannot be applied 

1228 """ 

1229 result = original_lines[:] 

1230 offset = 0 # Track line offset as we apply hunks 

1231 

1232 for hunk in patch.hunks: 

1233 # Adjust hunk position by offset 

1234 # old_start is 1-indexed; 0 means the hunk inserts at the beginning 

1235 target_line = max(hunk.old_start - 1, 0) + offset 

1236 

1237 # Extract old and new content from hunk 

1238 old_content: list[bytes] = [] 

1239 new_content: list[bytes] = [] 

1240 

1241 for line in hunk.lines: 

1242 if line.startswith(b"\\"): 

1243 # Skip "\ No newline at end of file" markers 

1244 continue 

1245 elif line.startswith(b" "): 

1246 # Context line - add newline if not present 

1247 content = line[1:] 

1248 if not content.endswith(b"\n"): 

1249 content += b"\n" 

1250 old_content.append(content) 

1251 new_content.append(content) 

1252 elif line.startswith(b"-"): 

1253 # Deletion - add newline if not present 

1254 content = line[1:] 

1255 if not content.endswith(b"\n"): 

1256 content += b"\n" 

1257 old_content.append(content) 

1258 elif line.startswith(b"+"): 

1259 # Addition - add newline if not present 

1260 content = line[1:] 

1261 if not content.endswith(b"\n"): 

1262 content += b"\n" 

1263 new_content.append(content) 

1264 

1265 # Verify context matches 

1266 if target_line < 0 or target_line + len(old_content) > len(result): 

1267 # TODO: Implement fuzzy matching 

1268 return None 

1269 

1270 for i, old_line in enumerate(old_content): 

1271 if result[target_line + i] != old_line: 

1272 # Context doesn't match 

1273 # TODO: Implement fuzzy matching 

1274 return None 

1275 

1276 # Apply the patch 

1277 result[target_line : target_line + len(old_content)] = new_content 

1278 

1279 # Update offset for next hunk 

1280 offset += len(new_content) - len(old_content) 

1281 

1282 return result 

1283 

1284 

1285def _apply_rename_or_copy( 

1286 r: "Repo", 

1287 src_path: bytes, 

1288 dst_path: bytes, 

1289 strip: int, 

1290 patch: FilePatch, 

1291 is_rename: bool, 

1292 cached: bool, 

1293 check: bool, 

1294 config: "Config | None", 

1295) -> tuple[list[bytes] | None, bool]: 

1296 """Apply a rename or copy operation. 

1297 

1298 Args: 

1299 r: Repository object 

1300 src_path: Source path 

1301 dst_path: Destination path 

1302 strip: Number of path components to strip 

1303 patch: FilePatch object 

1304 is_rename: True for rename, False for copy 

1305 cached: Apply to index only, not working tree 

1306 check: Check only, don't apply 

1307 config: Repository configuration 

1308 

1309 Returns: 

1310 A tuple of (``original_lines``, ``should_continue``) where: 

1311 - ``original_lines``: Content lines if hunks need to be applied, None otherwise 

1312 - ``should_continue``: True to skip to next patch, False to continue processing 

1313 """ 

1314 from .index import ConflictedIndexEntry, IndexEntry, index_entry_from_stat 

1315 

1316 # Strip path components 

1317 src_stripped = src_path 

1318 dst_stripped = dst_path 

1319 if strip > 0: 

1320 src_parts = src_path.split(b"/") 

1321 if len(src_parts) > strip: 

1322 src_stripped = b"/".join(src_parts[strip:]) 

1323 dst_parts = dst_path.split(b"/") 

1324 if len(dst_parts) > strip: 

1325 dst_stripped = b"/".join(dst_parts[strip:]) 

1326 

1327 repo_path_bytes = r.path.encode("utf-8") if isinstance(r.path, str) else r.path 

1328 src_fs_path = os.path.join(repo_path_bytes, src_stripped) 

1329 dst_fs_path = os.path.join(repo_path_bytes, dst_stripped) 

1330 

1331 # Read content from source file 

1332 op_name = "rename" if is_rename else "copy" 

1333 if os.path.exists(src_fs_path): 

1334 with open(src_fs_path, "rb") as f: 

1335 content = f.read() 

1336 else: 

1337 # Try to read from index 

1338 index = r.open_index(config=config) 

1339 if src_stripped in index: 

1340 entry = index[src_stripped] 

1341 if not isinstance(entry, ConflictedIndexEntry): 

1342 obj = r.object_store[entry.sha] 

1343 if isinstance(obj, Blob): 

1344 content = obj.data 

1345 else: 

1346 raise ValueError( 

1347 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} not found" 

1348 ) 

1349 else: 

1350 raise ValueError( 

1351 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} is conflicted" 

1352 ) 

1353 else: 

1354 raise ValueError( 

1355 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} not found" 

1356 ) 

1357 

1358 # If there are hunks, return content as lines for further processing 

1359 if patch.hunks: 

1360 return content.splitlines(keepends=True), False 

1361 

1362 # No hunks - pure rename/copy 

1363 if check: 

1364 return None, True 

1365 

1366 # Write to destination 

1367 if not cached: 

1368 os.makedirs(os.path.dirname(dst_fs_path), exist_ok=True) 

1369 with open(dst_fs_path, "wb") as f: 

1370 f.write(content) 

1371 if patch.new_mode is not None: 

1372 os.chmod(dst_fs_path, patch.new_mode) 

1373 

1374 # Update index 

1375 index = r.open_index(config=config) 

1376 blob = Blob.from_string(content) 

1377 r.object_store.add_object(blob) 

1378 

1379 if not cached and os.path.exists(dst_fs_path): 

1380 st = os.stat(dst_fs_path) 

1381 entry = index_entry_from_stat(st, blob.id, 0) 

1382 else: 

1383 entry = IndexEntry( 

1384 ctime=(0, 0), 

1385 mtime=(0, 0), 

1386 dev=0, 

1387 ino=0, 

1388 mode=patch.new_mode or 0o100644, 

1389 uid=0, 

1390 gid=0, 

1391 size=len(content), 

1392 sha=blob.id, 

1393 flags=0, 

1394 ) 

1395 

1396 index[dst_stripped] = entry 

1397 

1398 # For renames, remove the old file 

1399 if is_rename: 

1400 if not cached and os.path.exists(src_fs_path): 

1401 os.remove(src_fs_path) 

1402 if src_stripped in index: 

1403 del index[src_stripped] 

1404 

1405 index.write() 

1406 return None, True 

1407 

1408 

1409def apply_patches( 

1410 r: "Repo", 

1411 patches: list[FilePatch], 

1412 cached: bool = False, 

1413 reverse: bool = False, 

1414 check: bool = False, 

1415 strip: int = 1, 

1416 three_way: bool = False, 

1417 *, 

1418 config: "Config | None" = None, 

1419) -> None: 

1420 """Apply a list of file patches to a repository. 

1421 

1422 Args: 

1423 r: Repository object 

1424 patches: List of FilePatch objects to apply 

1425 cached: Apply patch to index only, not working tree 

1426 reverse: Apply patch in reverse 

1427 check: Only check if patch can be applied, don't apply 

1428 strip: Number of leading path components to strip (default: 1) 

1429 three_way: Fall back to 3-way merge if patch does not apply cleanly 

1430 config: Repository configuration. If None, falls back to 

1431 ``r.get_config_stack()``. 

1432 

1433 Raises: 

1434 ValueError: If patch cannot be applied 

1435 """ 

1436 from .index import ConflictedIndexEntry, IndexEntry, index_entry_from_stat 

1437 

1438 if config is None: 

1439 config = r.get_config_stack() 

1440 

1441 for patch in patches: 

1442 # Determine the file path 

1443 # For renames/copies without hunks, old_path/new_path may be None 

1444 # Use local variables to avoid mutating the patch object 

1445 old_path = patch.old_path 

1446 new_path = patch.new_path 

1447 

1448 if new_path is None and old_path is None: 

1449 if patch.rename_to is not None: 

1450 # Use rename_to for the target path 

1451 new_path = patch.rename_to 

1452 old_path = patch.rename_from 

1453 elif patch.copy_to is not None: 

1454 # Use copy_to for the target path 

1455 new_path = patch.copy_to 

1456 old_path = patch.copy_from 

1457 else: 

1458 raise ValueError("Patch has no file path") 

1459 

1460 # Choose path based on operation 

1461 file_path: bytes 

1462 if new_path is None: 

1463 # Deletion 

1464 if old_path is None: 

1465 raise ValueError("Patch has no file path") 

1466 file_path = old_path 

1467 elif old_path is None: 

1468 # Addition 

1469 file_path = new_path 

1470 else: 

1471 # Modification (use new path) 

1472 file_path = new_path 

1473 

1474 # Strip path components 

1475 if strip > 0: 

1476 parts = file_path.split(b"/") 

1477 if len(parts) > strip: 

1478 file_path = b"/".join(parts[strip:]) 

1479 

1480 # Convert to filesystem path 

1481 tree_path = file_path 

1482 fs_path = os.path.join( 

1483 r.path.encode("utf-8") if isinstance(r.path, str) else r.path, file_path 

1484 ) 

1485 

1486 # Handle renames and copies 

1487 original_lines: list[bytes] | None = None 

1488 if patch.rename_from is not None and patch.rename_to is not None: 

1489 original_lines, should_continue = _apply_rename_or_copy( 

1490 r, 

1491 patch.rename_from, 

1492 patch.rename_to, 

1493 strip, 

1494 patch, 

1495 is_rename=True, 

1496 cached=cached, 

1497 check=check, 

1498 config=config, 

1499 ) 

1500 if should_continue: 

1501 continue 

1502 elif patch.copy_from is not None and patch.copy_to is not None: 

1503 original_lines, should_continue = _apply_rename_or_copy( 

1504 r, 

1505 patch.copy_from, 

1506 patch.copy_to, 

1507 strip, 

1508 patch, 

1509 is_rename=False, 

1510 cached=cached, 

1511 check=check, 

1512 config=config, 

1513 ) 

1514 if should_continue: 

1515 continue 

1516 

1517 # Handle binary patches 

1518 if patch.binary: 

1519 if patch.binary_new is not None: 

1520 # Decode binary patch 

1521 try: 

1522 binary_content = git_base85_decode(patch.binary_new) 

1523 except (ValueError, KeyError) as e: 

1524 raise ValueError(f"Failed to decode binary patch: {e}") 

1525 

1526 if check: 

1527 # Just checking, don't actually apply 

1528 continue 

1529 

1530 # Write binary file 

1531 if not cached: 

1532 os.makedirs(os.path.dirname(fs_path), exist_ok=True) 

1533 with open(fs_path, "wb") as f: 

1534 f.write(binary_content) 

1535 if patch.new_mode is not None: 

1536 os.chmod(fs_path, patch.new_mode) 

1537 

1538 # Update index 

1539 index = r.open_index(config=config) 

1540 blob = Blob.from_string(binary_content) 

1541 r.object_store.add_object(blob) 

1542 

1543 if not cached and os.path.exists(fs_path): 

1544 st = os.stat(fs_path) 

1545 entry = index_entry_from_stat(st, blob.id, 0) 

1546 else: 

1547 entry = IndexEntry( 

1548 ctime=(0, 0), 

1549 mtime=(0, 0), 

1550 dev=0, 

1551 ino=0, 

1552 mode=patch.new_mode or 0o100644, 

1553 uid=0, 

1554 gid=0, 

1555 size=len(binary_content), 

1556 sha=blob.id, 

1557 flags=0, 

1558 ) 

1559 

1560 index[tree_path] = entry 

1561 index.write() 

1562 continue 

1563 else: 

1564 # Old-style "Binary files differ" message without actual patch data 

1565 raise NotImplementedError( 

1566 "Binary patch detected but no patch data provided (use git diff --binary)" 

1567 ) 

1568 

1569 # Read original file content (unless already loaded from rename/copy) 

1570 if original_lines is None: 

1571 if patch.old_path is None: 

1572 # New file 

1573 original_lines = [] 

1574 else: 

1575 if os.path.exists(fs_path): 

1576 with open(fs_path, "rb") as f: 

1577 content = f.read() 

1578 original_lines = content.splitlines(keepends=True) 

1579 else: 

1580 # File doesn't exist - check if it's in the index 

1581 try: 

1582 index = r.open_index(config=config) 

1583 if tree_path in index: 

1584 index_entry: IndexEntry | ConflictedIndexEntry = index[ 

1585 tree_path 

1586 ] 

1587 if not isinstance(index_entry, ConflictedIndexEntry): 

1588 obj = r.object_store[index_entry.sha] 

1589 if isinstance(obj, Blob): 

1590 original_lines = obj.data.splitlines(keepends=True) 

1591 else: 

1592 original_lines = [] 

1593 else: 

1594 original_lines = [] 

1595 else: 

1596 original_lines = [] 

1597 except (KeyError, FileNotFoundError): 

1598 original_lines = [] 

1599 

1600 # Reverse patch if requested 

1601 if reverse: 

1602 # Swap old and new in hunks 

1603 for hunk in patch.hunks: 

1604 hunk.old_start, hunk.new_start = hunk.new_start, hunk.old_start 

1605 hunk.old_count, hunk.new_count = hunk.new_count, hunk.old_count 

1606 # Swap +/- prefixes 

1607 reversed_lines = [] 

1608 for line in hunk.lines: 

1609 if line.startswith(b"+"): 

1610 reversed_lines.append(b"-" + line[1:]) 

1611 elif line.startswith(b"-"): 

1612 reversed_lines.append(b"+" + line[1:]) 

1613 else: 

1614 reversed_lines.append(line) 

1615 hunk.lines = reversed_lines 

1616 

1617 # Apply the patch 

1618 assert original_lines is not None 

1619 result = apply_patch_hunks(patch, original_lines) 

1620 

1621 if result is None and three_way: 

1622 # Try 3-way merge fallback 

1623 from .merge import merge_blobs 

1624 

1625 # Reconstruct base version from the patch 

1626 # Base is what you get by taking only the old lines from hunks 

1627 base_lines = [] 

1628 theirs_lines = [] 

1629 

1630 for hunk in patch.hunks: 

1631 for line in hunk.lines: 

1632 if line.startswith(b"\\"): 

1633 # Skip "\ No newline at end of file" markers 

1634 continue 

1635 elif line.startswith(b" "): 

1636 # Context line - in both base and theirs 

1637 content = line[1:] 

1638 if not content.endswith(b"\n"): 

1639 content += b"\n" 

1640 base_lines.append(content) 

1641 theirs_lines.append(content) 

1642 elif line.startswith(b"-"): 

1643 # Deletion - only in base 

1644 content = line[1:] 

1645 if not content.endswith(b"\n"): 

1646 content += b"\n" 

1647 base_lines.append(content) 

1648 elif line.startswith(b"+"): 

1649 # Addition - only in theirs 

1650 content = line[1:] 

1651 if not content.endswith(b"\n"): 

1652 content += b"\n" 

1653 theirs_lines.append(content) 

1654 

1655 # Create blobs for merging 

1656 base_content = b"".join(base_lines) 

1657 ours_content = b"".join(original_lines) 

1658 theirs_content = b"".join(theirs_lines) 

1659 

1660 base_blob = Blob.from_string(base_content) if base_content else None 

1661 ours_blob = Blob.from_string(ours_content) if ours_content else None 

1662 theirs_blob = Blob.from_string(theirs_content) 

1663 

1664 # Perform 3-way merge 

1665 merged_content, _had_conflicts = merge_blobs( 

1666 base_blob, ours_blob, theirs_blob, path=tree_path 

1667 ) 

1668 

1669 result = merged_content.splitlines(keepends=True) 

1670 

1671 # Note: if _had_conflicts is True, the result contains conflict markers 

1672 # Git would exit with error code, but we continue processing 

1673 elif result is None: 

1674 raise PatchApplicationFailure( 

1675 f"Patch does not apply to {file_path.decode('utf-8', errors='replace')}" 

1676 ) 

1677 

1678 if check: 

1679 # Just checking, don't actually apply 

1680 continue 

1681 

1682 # Write result 

1683 result_content = b"".join(result) 

1684 

1685 if patch.new_path is None: 

1686 # File deletion 

1687 if not cached and os.path.exists(fs_path): 

1688 os.remove(fs_path) 

1689 # Remove from index 

1690 index = r.open_index(config=config) 

1691 if tree_path in index: 

1692 del index[tree_path] 

1693 index.write() 

1694 else: 

1695 # File addition or modification 

1696 if not cached: 

1697 # Write to working tree 

1698 os.makedirs(os.path.dirname(fs_path), exist_ok=True) 

1699 with open(fs_path, "wb") as f: 

1700 f.write(result_content) 

1701 

1702 # Update file mode if specified 

1703 if patch.new_mode is not None: 

1704 os.chmod(fs_path, patch.new_mode) 

1705 

1706 # Update index 

1707 index = r.open_index(config=config) 

1708 blob = Blob.from_string(result_content) 

1709 r.object_store.add_object(blob) 

1710 

1711 # Get file stat for index entry 

1712 if not cached and os.path.exists(fs_path): 

1713 st = os.stat(fs_path) 

1714 entry = index_entry_from_stat(st, blob.id, 0) 

1715 else: 

1716 # Create a minimal index entry for cached-only changes 

1717 entry = IndexEntry( 

1718 ctime=(0, 0), 

1719 mtime=(0, 0), 

1720 dev=0, 

1721 ino=0, 

1722 mode=patch.new_mode or 0o100644, 

1723 uid=0, 

1724 gid=0, 

1725 size=len(result_content), 

1726 sha=blob.id, 

1727 flags=0, 

1728 ) 

1729 

1730 index[tree_path] = entry 

1731 

1732 # Handle cleanup for renames with hunks 

1733 if patch.rename_from is not None and patch.rename_to is not None: 

1734 # Remove old file after successful rename 

1735 old_rename_path = patch.rename_from 

1736 if strip > 0: 

1737 old_parts = old_rename_path.split(b"/") 

1738 if len(old_parts) > strip: 

1739 old_rename_path = b"/".join(old_parts[strip:]) 

1740 

1741 old_fs_path = os.path.join( 

1742 r.path.encode("utf-8") if isinstance(r.path, str) else r.path, 

1743 old_rename_path, 

1744 ) 

1745 

1746 if not cached and os.path.exists(old_fs_path): 

1747 os.remove(old_fs_path) 

1748 if old_rename_path in index: 

1749 del index[old_rename_path] 

1750 

1751 index.write() 

1752 

1753 

1754def mailinfo( 

1755 msg: email.message.Message | BinaryIO | TextIO, 

1756 keep_subject: bool = False, 

1757 keep_non_patch: bool = False, 

1758 encoding: str | None = None, 

1759 scissors: bool = False, 

1760 message_id: bool = False, 

1761) -> MailinfoResult: 

1762 """Extract patch information from an email message. 

1763 

1764 This function parses an email message and extracts commit metadata 

1765 (author, email, subject) and separates the commit message from the 

1766 patch content, similar to git mailinfo. 

1767 

1768 Args: 

1769 msg: Email message (email.message.Message object) or file handle to read from 

1770 keep_subject: If True, keep subject intact without munging (-k) 

1771 keep_non_patch: If True, only strip [PATCH] from brackets (-b) 

1772 encoding: Character encoding to use (default: detect from message) 

1773 scissors: If True, remove everything before scissors line 

1774 message_id: If True, include Message-ID in commit message (-m) 

1775 

1776 Returns: 

1777 MailinfoResult with parsed information 

1778 

1779 Raises: 

1780 ValueError: If message is malformed or missing required fields 

1781 """ 

1782 # Parse message if given a file handle 

1783 parsed_msg: email.message.Message 

1784 if not isinstance(msg, email.message.Message): 

1785 if hasattr(msg, "read"): 

1786 content = msg.read() 

1787 if isinstance(content, bytes): 

1788 bparser = email.parser.BytesParser() 

1789 parsed_msg = bparser.parsebytes(content) 

1790 else: 

1791 sparser = email.parser.Parser() 

1792 parsed_msg = sparser.parsestr(content) 

1793 else: 

1794 raise ValueError("msg must be an email.message.Message or file-like object") 

1795 else: 

1796 parsed_msg = msg 

1797 

1798 # Detect encoding from message if not specified 

1799 if encoding is None: 

1800 encoding = parsed_msg.get_content_charset() or "utf-8" 

1801 

1802 # Extract author information 

1803 from_header = parsed_msg.get("From", "") 

1804 if not from_header: 

1805 raise ValueError("Email message missing 'From' header") 

1806 

1807 # Parse "Name <email>" format 

1808 author_name, author_email = email.utils.parseaddr(from_header) 

1809 if not author_email: 

1810 raise ValueError( 

1811 f"Could not parse email address from 'From' header: {from_header}" 

1812 ) 

1813 

1814 # Extract date 

1815 date_header = parsed_msg.get("Date") 

1816 author_date = date_header if date_header else None 

1817 

1818 # Extract and process subject 

1819 subject = parsed_msg.get("Subject", "") 

1820 if not subject: 

1821 subject = "(no subject)" 

1822 

1823 # Convert Header object to string if needed 

1824 subject = str(subject) 

1825 

1826 # Remove newlines from subject 

1827 subject = subject.replace("\n", " ").replace("\r", " ") 

1828 subject = _munge_subject(subject, keep_subject, keep_non_patch) 

1829 

1830 # Extract Message-ID if requested 

1831 msg_id = None 

1832 if message_id: 

1833 msg_id = parsed_msg.get("Message-ID") 

1834 

1835 # Get message body 

1836 body = parsed_msg.get_payload(decode=True) 

1837 if body is None: 

1838 body = b"" 

1839 elif isinstance(body, str): 

1840 body = body.encode(encoding) 

1841 elif not isinstance(body, bytes): 

1842 # Handle multipart or other types 

1843 body = str(body).encode(encoding) 

1844 

1845 # Split into lines 

1846 lines = body.splitlines(keepends=True) 

1847 

1848 # Handle scissors 

1849 scissors_idx = None 

1850 if scissors: 

1851 scissors_idx = _find_scissors_line(lines) 

1852 if scissors_idx is not None: 

1853 # Remove everything up to and including scissors line 

1854 lines = lines[scissors_idx + 1 :] 

1855 

1856 # Separate commit message from patch 

1857 # Look for the "---" separator that indicates start of diffstat/patch 

1858 message_lines: list[bytes] = [] 

1859 patch_lines: list[bytes] = [] 

1860 in_patch = False 

1861 

1862 for line in lines: 

1863 if not in_patch and line == b"---\n": 

1864 in_patch = True 

1865 patch_lines.append(line) 

1866 elif in_patch: 

1867 # Stop at signature marker "-- " 

1868 if line == b"-- \n": 

1869 break 

1870 patch_lines.append(line) 

1871 else: 

1872 message_lines.append(line) 

1873 

1874 # Build commit message 

1875 commit_message = b"".join(message_lines).decode(encoding, errors="replace") 

1876 

1877 # Clean up commit message 

1878 commit_message = commit_message.strip() 

1879 

1880 # Append Message-ID if requested 

1881 if message_id and msg_id: 

1882 if commit_message: 

1883 commit_message += "\n\n" 

1884 commit_message += f"Message-ID: {msg_id}" 

1885 

1886 # Build patch content 

1887 patch_content = b"".join(patch_lines).decode(encoding, errors="replace") 

1888 

1889 return MailinfoResult( 

1890 author_name=author_name, 

1891 author_email=author_email, 

1892 author_date=author_date, 

1893 subject=subject, 

1894 message=commit_message, 

1895 patch=patch_content, 

1896 message_id=msg_id, 

1897 )