Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 9%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

822 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28__all__ = [ 

29 "DEFAULT_DIFF_ALGORITHM", 

30 "FIRST_FEW_BYTES", 

31 "DiffAlgorithmNotAvailable", 

32 "MailinfoResult", 

33 "PatchApplicationFailure", 

34 "apply_patch_hunks", 

35 "apply_patches", 

36 "commit_patch_id", 

37 "gen_diff_header", 

38 "get_summary", 

39 "git_am_patch_split", 

40 "is_binary", 

41 "mailinfo", 

42 "parse_patch_message", 

43 "patch_filename", 

44 "patch_id", 

45 "shortid", 

46 "unified_diff", 

47 "unified_diff_with_algorithm", 

48 "write_blob_diff", 

49 "write_commit_patch", 

50 "write_object_diff", 

51 "write_tree_diff", 

52] 

53 

54import email.message 

55import email.parser 

56import email.utils 

57import os 

58import re 

59import time 

60from collections.abc import Generator, Sequence 

61from dataclasses import dataclass 

62from difflib import SequenceMatcher 

63from typing import ( 

64 IO, 

65 TYPE_CHECKING, 

66 BinaryIO, 

67 TextIO, 

68) 

69 

70if TYPE_CHECKING: 

71 from .object_store import BaseObjectStore 

72 from .repo import Repo 

73 

74from .objects import S_ISGITLINK, Blob, Commit, ObjectID, RawObjectID 

75 

76FIRST_FEW_BYTES = 8000 

77 

78DEFAULT_DIFF_ALGORITHM = "myers" 

79 

80 

81class PatchApplicationFailure(Exception): 

82 """Raised when a patch does not apply cleanly.""" 

83 

84 

85class DiffAlgorithmNotAvailable(Exception): 

86 """Raised when a requested diff algorithm is not available.""" 

87 

88 def __init__(self, algorithm: str, install_hint: str = "") -> None: 

89 """Initialize exception. 

90 

91 Args: 

92 algorithm: Name of the unavailable algorithm 

93 install_hint: Optional installation hint 

94 """ 

95 self.algorithm = algorithm 

96 self.install_hint = install_hint 

97 if install_hint: 

98 super().__init__( 

99 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}" 

100 ) 

101 else: 

102 super().__init__( 

103 f"Diff algorithm '{algorithm}' requested but not available." 

104 ) 

105 

106 

107def write_commit_patch( 

108 f: IO[bytes], 

109 commit: "Commit", 

110 contents: str | bytes, 

111 progress: tuple[int, int], 

112 version: str | None = None, 

113 encoding: str | None = None, 

114) -> None: 

115 """Write a individual file patch. 

116 

117 Args: 

118 f: File-like object to write to 

119 commit: Commit object 

120 contents: Contents of the patch 

121 progress: tuple with current patch number and total. 

122 version: Version string to include in patch header 

123 encoding: Encoding to use for the patch 

124 

125 Returns: 

126 tuple with filename and contents 

127 """ 

128 encoding = encoding or getattr(f, "encoding", "ascii") 

129 if encoding is None: 

130 encoding = "ascii" 

131 if isinstance(contents, str): 

132 contents = contents.encode(encoding) 

133 (num, total) = progress 

134 f.write( 

135 b"From " 

136 + commit.id 

137 + b" " 

138 + time.ctime(commit.commit_time).encode(encoding) 

139 + b"\n" 

140 ) 

141 f.write(b"From: " + commit.author + b"\n") 

142 f.write( 

143 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

144 ) 

145 f.write( 

146 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

147 ) 

148 f.write(b"\n") 

149 f.write(b"---\n") 

150 try: 

151 import subprocess 

152 

153 p = subprocess.Popen( 

154 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

155 ) 

156 except (ImportError, OSError): 

157 pass # diffstat not available? 

158 else: 

159 (diffstat, _) = p.communicate(contents) 

160 f.write(diffstat) 

161 f.write(b"\n") 

162 f.write(contents) 

163 f.write(b"-- \n") 

164 if version is None: 

165 from dulwich import __version__ as dulwich_version 

166 

167 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

168 else: 

169 if encoding is None: 

170 encoding = "ascii" 

171 f.write(version.encode(encoding) + b"\n") 

172 

173 

174def get_summary(commit: "Commit") -> str: 

175 """Determine the summary line for use in a filename. 

176 

177 Args: 

178 commit: Commit 

179 Returns: Summary string 

180 """ 

181 decoded = commit.message.decode(errors="replace") 

182 lines = decoded.splitlines() 

183 return lines[0].replace(" ", "-") if lines else "" 

184 

185 

186# Unified Diff 

187def _format_range_unified(start: int, stop: int) -> str: 

188 """Convert range to the "ed" format.""" 

189 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

190 beginning = start + 1 # lines start numbering with one 

191 length = stop - start 

192 if length == 1: 

193 return f"{beginning}" 

194 if not length: 

195 beginning -= 1 # empty ranges begin at line just before the range 

196 return f"{beginning},{length}" 

197 

198 

199def unified_diff( 

200 a: Sequence[bytes], 

201 b: Sequence[bytes], 

202 fromfile: bytes = b"", 

203 tofile: bytes = b"", 

204 fromfiledate: str = "", 

205 tofiledate: str = "", 

206 n: int = 3, 

207 lineterm: str = "\n", 

208 tree_encoding: str = "utf-8", 

209 output_encoding: str = "utf-8", 

210) -> Generator[bytes, None, None]: 

211 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does. 

212 

213 Based on the same function in Python2.7 difflib.py 

214 """ 

215 started = False 

216 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

217 if not started: 

218 started = True 

219 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

220 todate = f"\t{tofiledate}" if tofiledate else "" 

221 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

222 output_encoding 

223 ) 

224 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

225 output_encoding 

226 ) 

227 

228 first, last = group[0], group[-1] 

229 file1_range = _format_range_unified(first[1], last[2]) 

230 file2_range = _format_range_unified(first[3], last[4]) 

231 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

232 

233 for tag, i1, i2, j1, j2 in group: 

234 if tag == "equal": 

235 for line in a[i1:i2]: 

236 yield b" " + line 

237 continue 

238 if tag in ("replace", "delete"): 

239 for line in a[i1:i2]: 

240 if not line[-1:] == b"\n": 

241 line += b"\n\\ No newline at end of file\n" 

242 yield b"-" + line 

243 if tag in ("replace", "insert"): 

244 for line in b[j1:j2]: 

245 if not line[-1:] == b"\n": 

246 line += b"\n\\ No newline at end of file\n" 

247 yield b"+" + line 

248 

249 

250def _get_sequence_matcher( 

251 algorithm: str, a: Sequence[bytes], b: Sequence[bytes] 

252) -> SequenceMatcher[bytes]: 

253 """Get appropriate sequence matcher for the given algorithm. 

254 

255 Args: 

256 algorithm: Diff algorithm ("myers" or "patience") 

257 a: First sequence 

258 b: Second sequence 

259 

260 Returns: 

261 Configured sequence matcher instance 

262 

263 Raises: 

264 DiffAlgorithmNotAvailable: If patience requested but not available 

265 """ 

266 if algorithm == "patience": 

267 try: 

268 from patiencediff import PatienceSequenceMatcher 

269 

270 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore] 

271 except ImportError: 

272 raise DiffAlgorithmNotAvailable( 

273 "patience", "Install with: pip install 'dulwich[patiencediff]'" 

274 ) 

275 else: 

276 return SequenceMatcher(a=a, b=b) 

277 

278 

279def unified_diff_with_algorithm( 

280 a: Sequence[bytes], 

281 b: Sequence[bytes], 

282 fromfile: bytes = b"", 

283 tofile: bytes = b"", 

284 fromfiledate: str = "", 

285 tofiledate: str = "", 

286 n: int = 3, 

287 lineterm: str = "\n", 

288 tree_encoding: str = "utf-8", 

289 output_encoding: str = "utf-8", 

290 algorithm: str | None = None, 

291) -> Generator[bytes, None, None]: 

292 """Generate unified diff with specified algorithm. 

293 

294 Args: 

295 a: First sequence of lines 

296 b: Second sequence of lines 

297 fromfile: Name of first file 

298 tofile: Name of second file 

299 fromfiledate: Date of first file 

300 tofiledate: Date of second file 

301 n: Number of context lines 

302 lineterm: Line terminator 

303 tree_encoding: Encoding for tree paths 

304 output_encoding: Encoding for output 

305 algorithm: Diff algorithm to use ("myers" or "patience") 

306 

307 Returns: 

308 Generator yielding diff lines 

309 

310 Raises: 

311 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available 

312 """ 

313 if algorithm is None: 

314 algorithm = DEFAULT_DIFF_ALGORITHM 

315 

316 matcher = _get_sequence_matcher(algorithm, a, b) 

317 

318 started = False 

319 for group in matcher.get_grouped_opcodes(n): 

320 if not started: 

321 started = True 

322 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

323 todate = f"\t{tofiledate}" if tofiledate else "" 

324 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

325 output_encoding 

326 ) 

327 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

328 output_encoding 

329 ) 

330 

331 first, last = group[0], group[-1] 

332 file1_range = _format_range_unified(first[1], last[2]) 

333 file2_range = _format_range_unified(first[3], last[4]) 

334 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

335 

336 for tag, i1, i2, j1, j2 in group: 

337 if tag == "equal": 

338 for line in a[i1:i2]: 

339 yield b" " + line 

340 continue 

341 if tag in ("replace", "delete"): 

342 for line in a[i1:i2]: 

343 if not line[-1:] == b"\n": 

344 line += b"\n\\ No newline at end of file\n" 

345 yield b"-" + line 

346 if tag in ("replace", "insert"): 

347 for line in b[j1:j2]: 

348 if not line[-1:] == b"\n": 

349 line += b"\n\\ No newline at end of file\n" 

350 yield b"+" + line 

351 

352 

353def is_binary(content: bytes) -> bool: 

354 """See if the first few bytes contain any null characters. 

355 

356 Args: 

357 content: Bytestring to check for binary content 

358 """ 

359 return b"\0" in content[:FIRST_FEW_BYTES] 

360 

361 

362def shortid(hexsha: bytes | None) -> bytes: 

363 """Get short object ID. 

364 

365 Args: 

366 hexsha: Full hex SHA or None 

367 

368 Returns: 

369 7-character short ID 

370 """ 

371 if hexsha is None: 

372 return b"0" * 7 

373 else: 

374 return hexsha[:7] 

375 

376 

377def patch_filename(p: bytes | None, root: bytes) -> bytes: 

378 """Generate patch filename. 

379 

380 Args: 

381 p: Path or None 

382 root: Root directory 

383 

384 Returns: 

385 Full patch filename 

386 """ 

387 if p is None: 

388 return b"/dev/null" 

389 else: 

390 return root + b"/" + p 

391 

392 

393def write_object_diff( 

394 f: IO[bytes], 

395 store: "BaseObjectStore", 

396 old_file: tuple[bytes | None, int | None, ObjectID | None], 

397 new_file: tuple[bytes | None, int | None, ObjectID | None], 

398 diff_binary: bool = False, 

399 diff_algorithm: str | None = None, 

400) -> None: 

401 """Write the diff for an object. 

402 

403 Args: 

404 f: File-like object to write to 

405 store: Store to retrieve objects from, if necessary 

406 old_file: (path, mode, hexsha) tuple 

407 new_file: (path, mode, hexsha) tuple 

408 diff_binary: Whether to diff files even if they 

409 are considered binary files by is_binary(). 

410 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

411 

412 Note: the tuple elements should be None for nonexistent files 

413 """ 

414 (old_path, old_mode, old_id) = old_file 

415 (new_path, new_mode, new_id) = new_file 

416 patched_old_path = patch_filename(old_path, b"a") 

417 patched_new_path = patch_filename(new_path, b"b") 

418 

419 def content(mode: int | None, hexsha: ObjectID | None) -> Blob: 

420 """Get blob content for a file. 

421 

422 Args: 

423 mode: File mode 

424 hexsha: Object SHA 

425 

426 Returns: 

427 Blob object 

428 """ 

429 if hexsha is None: 

430 return Blob.from_string(b"") 

431 elif mode is not None and S_ISGITLINK(mode): 

432 return Blob.from_string(b"Subproject commit " + hexsha + b"\n") 

433 else: 

434 obj = store[hexsha] 

435 if isinstance(obj, Blob): 

436 return obj 

437 else: 

438 # Fallback for non-blob objects 

439 return Blob.from_string(obj.as_raw_string()) 

440 

441 def lines(content: "Blob") -> list[bytes]: 

442 """Split blob content into lines. 

443 

444 Args: 

445 content: Blob content 

446 

447 Returns: 

448 List of lines 

449 """ 

450 if not content: 

451 return [] 

452 else: 

453 return content.splitlines() 

454 

455 f.writelines( 

456 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

457 ) 

458 old_content = content(old_mode, old_id) 

459 new_content = content(new_mode, new_id) 

460 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

461 binary_diff = ( 

462 b"Binary files " 

463 + patched_old_path 

464 + b" and " 

465 + patched_new_path 

466 + b" differ\n" 

467 ) 

468 f.write(binary_diff) 

469 else: 

470 f.writelines( 

471 unified_diff_with_algorithm( 

472 lines(old_content), 

473 lines(new_content), 

474 patched_old_path, 

475 patched_new_path, 

476 algorithm=diff_algorithm, 

477 ) 

478 ) 

479 

480 

481# TODO(jelmer): Support writing unicode, rather than bytes. 

482def gen_diff_header( 

483 paths: tuple[bytes | None, bytes | None], 

484 modes: tuple[int | None, int | None], 

485 shas: tuple[bytes | None, bytes | None], 

486) -> Generator[bytes, None, None]: 

487 """Write a blob diff header. 

488 

489 Args: 

490 paths: Tuple with old and new path 

491 modes: Tuple with old and new modes 

492 shas: Tuple with old and new shas 

493 """ 

494 (old_path, new_path) = paths 

495 (old_mode, new_mode) = modes 

496 (old_sha, new_sha) = shas 

497 if old_path is None and new_path is not None: 

498 old_path = new_path 

499 if new_path is None and old_path is not None: 

500 new_path = old_path 

501 old_path = patch_filename(old_path, b"a") 

502 new_path = patch_filename(new_path, b"b") 

503 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

504 

505 if old_mode != new_mode: 

506 if new_mode is not None: 

507 if old_mode is not None: 

508 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

509 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

510 else: 

511 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

512 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

513 if new_mode is not None and old_mode is not None: 

514 yield (f" {new_mode:o}").encode("ascii") 

515 yield b"\n" 

516 

517 

518# TODO(jelmer): Support writing unicode, rather than bytes. 

519def write_blob_diff( 

520 f: IO[bytes], 

521 old_file: tuple[bytes | None, int | None, "Blob | None"], 

522 new_file: tuple[bytes | None, int | None, "Blob | None"], 

523 diff_algorithm: str | None = None, 

524) -> None: 

525 """Write blob diff. 

526 

527 Args: 

528 f: File-like object to write to 

529 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

530 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

531 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

532 

533 Note: The use of write_object_diff is recommended over this function. 

534 """ 

535 (old_path, old_mode, old_blob) = old_file 

536 (new_path, new_mode, new_blob) = new_file 

537 patched_old_path = patch_filename(old_path, b"a") 

538 patched_new_path = patch_filename(new_path, b"b") 

539 

540 def lines(blob: "Blob | None") -> list[bytes]: 

541 """Split blob content into lines. 

542 

543 Args: 

544 blob: Blob object or None 

545 

546 Returns: 

547 List of lines 

548 """ 

549 if blob is not None: 

550 return blob.splitlines() 

551 else: 

552 return [] 

553 

554 f.writelines( 

555 gen_diff_header( 

556 (old_path, new_path), 

557 (old_mode, new_mode), 

558 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

559 ) 

560 ) 

561 old_contents = lines(old_blob) 

562 new_contents = lines(new_blob) 

563 f.writelines( 

564 unified_diff_with_algorithm( 

565 old_contents, 

566 new_contents, 

567 patched_old_path, 

568 patched_new_path, 

569 algorithm=diff_algorithm, 

570 ) 

571 ) 

572 

573 

574def write_tree_diff( 

575 f: IO[bytes], 

576 store: "BaseObjectStore", 

577 old_tree: ObjectID | None, 

578 new_tree: ObjectID | None, 

579 diff_binary: bool = False, 

580 diff_algorithm: str | None = None, 

581) -> None: 

582 """Write tree diff. 

583 

584 Args: 

585 f: File-like object to write to. 

586 store: Object store to read from 

587 old_tree: Old tree id 

588 new_tree: New tree id 

589 diff_binary: Whether to diff files even if they 

590 are considered binary files by is_binary(). 

591 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

592 """ 

593 changes = store.tree_changes(old_tree, new_tree) 

594 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

595 write_object_diff( 

596 f, 

597 store, 

598 (oldpath, oldmode, oldsha), 

599 (newpath, newmode, newsha), 

600 diff_binary=diff_binary, 

601 diff_algorithm=diff_algorithm, 

602 ) 

603 

604 

605def git_am_patch_split( 

606 f: TextIO | BinaryIO, encoding: str | None = None 

607) -> tuple["Commit", bytes, bytes | None]: 

608 """Parse a git-am-style patch and split it up into bits. 

609 

610 Args: 

611 f: File-like object to parse 

612 encoding: Encoding to use when creating Git objects 

613 Returns: Tuple with commit object, diff contents and git version 

614 """ 

615 encoding = encoding or getattr(f, "encoding", "ascii") 

616 encoding = encoding or "ascii" 

617 contents = f.read() 

618 if isinstance(contents, bytes): 

619 bparser = email.parser.BytesParser() 

620 msg = bparser.parsebytes(contents) 

621 else: 

622 uparser = email.parser.Parser() 

623 msg = uparser.parsestr(contents) 

624 return parse_patch_message(msg, encoding) 

625 

626 

627def parse_patch_message( 

628 msg: email.message.Message, encoding: str | None = None 

629) -> tuple["Commit", bytes, bytes | None]: 

630 """Extract a Commit object and patch from an e-mail message. 

631 

632 Args: 

633 msg: An email message (email.message.Message) 

634 encoding: Encoding to use to encode Git commits 

635 Returns: Tuple with commit object, diff contents and git version 

636 """ 

637 c = Commit() 

638 if encoding is None: 

639 encoding = "ascii" 

640 c.author = msg["from"].encode(encoding) 

641 c.committer = msg["from"].encode(encoding) 

642 try: 

643 patch_tag_start = msg["subject"].index("[PATCH") 

644 except ValueError: 

645 subject = msg["subject"] 

646 else: 

647 close = msg["subject"].index("] ", patch_tag_start) 

648 subject = msg["subject"][close + 2 :] 

649 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

650 first = True 

651 

652 body = msg.get_payload(decode=True) 

653 if isinstance(body, str): 

654 body = body.encode(encoding) 

655 if isinstance(body, bytes): 

656 lines = body.splitlines(True) 

657 else: 

658 # Handle other types by converting to string first 

659 lines = str(body).encode(encoding).splitlines(True) 

660 line_iter = iter(lines) 

661 

662 for line in line_iter: 

663 if line == b"---\n": 

664 break 

665 if first: 

666 if line.startswith(b"From: "): 

667 c.author = line[len(b"From: ") :].rstrip() 

668 else: 

669 c.message += b"\n" + line 

670 first = False 

671 else: 

672 c.message += line 

673 diff = b"" 

674 for line in line_iter: 

675 if line == b"-- \n": 

676 break 

677 diff += line 

678 try: 

679 version = next(line_iter).rstrip(b"\n") 

680 except StopIteration: 

681 version = None 

682 return c, diff, version 

683 

684 

685def patch_id(diff_data: bytes) -> bytes: 

686 """Compute patch ID for a diff. 

687 

688 The patch ID is computed by normalizing the diff and computing a SHA1 hash. 

689 This follows git's patch-id algorithm which: 

690 1. Removes whitespace from lines starting with + or - 

691 2. Replaces line numbers in @@ headers with a canonical form 

692 3. Computes SHA1 of the result 

693 

694 Args: 

695 diff_data: Raw diff data as bytes 

696 

697 Returns: 

698 SHA1 hash of normalized diff (40-byte hex string) 

699 

700 TODO: This implementation uses a simple line-by-line approach. For better 

701 compatibility with git's patch-id, consider using proper patch parsing that: 

702 - Handles edge cases in diff format (binary diffs, mode changes, etc.) 

703 - Properly parses unified diff format according to the spec 

704 - Matches git's exact normalization algorithm byte-for-byte 

705 See git's patch-id.c for reference implementation. 

706 """ 

707 import hashlib 

708 import re 

709 

710 # Normalize the diff for patch-id computation 

711 normalized_lines = [] 

712 

713 for line in diff_data.split(b"\n"): 

714 # Skip diff headers (diff --git, index, ---, +++) 

715 if line.startswith( 

716 ( 

717 b"diff --git ", 

718 b"index ", 

719 b"--- ", 

720 b"+++ ", 

721 b"new file mode ", 

722 b"old file mode ", 

723 b"deleted file mode ", 

724 b"new mode ", 

725 b"old mode ", 

726 b"similarity index ", 

727 b"dissimilarity index ", 

728 b"rename from ", 

729 b"rename to ", 

730 b"copy from ", 

731 b"copy to ", 

732 ) 

733 ): 

734 continue 

735 

736 # Normalize @@ headers to a canonical form 

737 if line.startswith(b"@@"): 

738 # Replace line numbers with canonical form 

739 match = re.match(rb"^@@\s+-\d+(?:,\d+)?\s+\+\d+(?:,\d+)?\s+@@", line) 

740 if match: 

741 # Use canonical hunk header without line numbers 

742 normalized_lines.append(b"@@") 

743 continue 

744 

745 # For +/- lines, strip all whitespace 

746 if line.startswith((b"+", b"-")): 

747 # Keep the +/- prefix but remove all whitespace from the rest 

748 if len(line) > 1: 

749 # Remove all whitespace from the content 

750 content = line[1:].replace(b" ", b"").replace(b"\t", b"") 

751 normalized_lines.append(line[:1] + content) 

752 else: 

753 # Just +/- alone 

754 normalized_lines.append(line[:1]) 

755 continue 

756 

757 # Keep context lines and other content as-is 

758 if line.startswith(b" ") or line == b"": 

759 normalized_lines.append(line) 

760 

761 # Join normalized lines and compute SHA1 

762 normalized = b"\n".join(normalized_lines) 

763 return hashlib.sha1(normalized).hexdigest().encode("ascii") 

764 

765 

766def commit_patch_id( 

767 store: "BaseObjectStore", commit_id: ObjectID | RawObjectID 

768) -> bytes: 

769 """Compute patch ID for a commit. 

770 

771 Args: 

772 store: Object store to read objects from 

773 commit_id: Commit ID (40-byte hex string) 

774 

775 Returns: 

776 Patch ID (40-byte hex string) 

777 """ 

778 from io import BytesIO 

779 

780 commit = store[commit_id] 

781 assert isinstance(commit, Commit) 

782 

783 # Get the parent tree (or empty tree for root commit) 

784 if commit.parents: 

785 parent = store[commit.parents[0]] 

786 assert isinstance(parent, Commit) 

787 parent_tree = parent.tree 

788 else: 

789 # Root commit - compare against empty tree 

790 parent_tree = None 

791 

792 # Generate diff 

793 diff_output = BytesIO() 

794 write_tree_diff(diff_output, store, parent_tree, commit.tree) 

795 

796 return patch_id(diff_output.getvalue()) 

797 

798 

799@dataclass 

800class MailinfoResult: 

801 """Result of mailinfo parsing. 

802 

803 Attributes: 

804 author_name: Author's name 

805 author_email: Author's email address 

806 author_date: Author's date (if present in the email) 

807 subject: Processed subject line 

808 message: Commit message body 

809 patch: Patch content 

810 message_id: Message-ID header (if -m/--message-id was used) 

811 """ 

812 

813 author_name: str 

814 author_email: str 

815 author_date: str | None 

816 subject: str 

817 message: str 

818 patch: str 

819 message_id: str | None = None 

820 

821 

822def _munge_subject(subject: str, keep_subject: bool, keep_non_patch: bool) -> str: 

823 """Munge email subject line for commit message. 

824 

825 Args: 

826 subject: Original subject line 

827 keep_subject: If True, keep subject intact (-k option) 

828 keep_non_patch: If True, only strip [PATCH] (-b option) 

829 

830 Returns: 

831 Processed subject line 

832 """ 

833 if keep_subject: 

834 return subject 

835 

836 result = subject 

837 

838 # First remove Re: prefixes (they can appear before brackets) 

839 while True: 

840 new_result = re.sub(r"^\s*(?:re|RE|Re):\s*", "", result, flags=re.IGNORECASE) 

841 if new_result == result: 

842 break 

843 result = new_result 

844 

845 # Remove bracketed strings 

846 if keep_non_patch: 

847 # Only remove brackets containing "PATCH" 

848 # Match each bracket individually anywhere in the string 

849 while True: 

850 # Remove PATCH bracket, but be careful with whitespace 

851 new_result = re.sub( 

852 r"\[[^\]]*?PATCH[^\]]*?\](\s+)?", r"\1", result, flags=re.IGNORECASE 

853 ) 

854 if new_result == result: 

855 break 

856 result = new_result 

857 else: 

858 # Remove all bracketed strings 

859 while True: 

860 new_result = re.sub(r"^\s*\[.*?\]\s*", "", result) 

861 if new_result == result: 

862 break 

863 result = new_result 

864 

865 # Remove leading/trailing whitespace 

866 result = result.strip() 

867 

868 # Normalize multiple whitespace to single space 

869 result = re.sub(r"\s+", " ", result) 

870 

871 return result 

872 

873 

874def _find_scissors_line(lines: list[bytes]) -> int | None: 

875 """Find the scissors line in message body. 

876 

877 Args: 

878 lines: List of lines in the message body 

879 

880 Returns: 

881 Index of scissors line, or None if not found 

882 """ 

883 scissors_pattern = re.compile( 

884 rb"^(?:>?\s*-+\s*)?(?:8<|>8)?\s*-+\s*$|^(?:>?\s*-+\s*)(?:cut here|scissors)(?:\s*-+)?$", 

885 re.IGNORECASE, 

886 ) 

887 

888 for i, line in enumerate(lines): 

889 if scissors_pattern.match(line.strip()): 

890 return i 

891 

892 return None 

893 

894 

895def git_base85_decode(data: bytes) -> bytes: 

896 """Decode Git's base85-encoded binary data. 

897 

898 Git uses a custom base85 encoding with its own alphabet and line format. 

899 Each line starts with a length byte followed by base85-encoded data. 

900 

901 Args: 

902 data: Base85-encoded data as bytes (may contain multiple lines) 

903 

904 Returns: 

905 Decoded binary data 

906 

907 Raises: 

908 ValueError: If the data is invalid 

909 """ 

910 # Git's base85 alphabet (different from RFC 1924) 

911 alphabet = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~" 

912 

913 # Create decode table 

914 decode_table = {} 

915 for i, c in enumerate(alphabet): 

916 decode_table[c] = i 

917 

918 result = bytearray() 

919 lines = data.strip().split(b"\n") 

920 

921 for line in lines: 

922 if not line: 

923 continue 

924 

925 # First character encodes the length of decoded data for this line 

926 if line[0] not in decode_table: 

927 continue 

928 

929 encoded_len = decode_table[line[0]] 

930 if encoded_len == 0: 

931 continue 

932 

933 # Decode the rest of the line 

934 encoded_data = line[1:] 

935 

936 # Process in groups of 5 characters (which encode 4 bytes) 

937 i = 0 

938 decoded_this_line = 0 

939 while i < len(encoded_data) and decoded_this_line < encoded_len: 

940 # Get up to 5 characters 

941 group = encoded_data[i : i + 5] 

942 if len(group) == 0: 

943 break 

944 

945 # Decode 5 base85 digits to a 32-bit value 

946 value = 0 

947 for c in group: 

948 if c not in decode_table: 

949 raise ValueError(f"Invalid base85 character: {chr(c)}") 

950 value = value * 85 + decode_table[c] 

951 

952 # Convert to 4 bytes (big-endian) 

953 bytes_to_add = min(4, encoded_len - decoded_this_line) 

954 decoded_bytes = value.to_bytes(4, byteorder="big") 

955 result.extend(decoded_bytes[:bytes_to_add]) 

956 decoded_this_line += bytes_to_add 

957 i += 5 

958 

959 return bytes(result) 

960 

961 

962@dataclass 

963class PatchHunk: 

964 """Represents a single hunk in a unified diff. 

965 

966 Attributes: 

967 old_start: Starting line number in old file 

968 old_count: Number of lines in old file 

969 new_start: Starting line number in new file 

970 new_count: Number of lines in new file 

971 lines: List of diff lines (prefixed with ' ', '+', or '-') 

972 """ 

973 

974 old_start: int 

975 old_count: int 

976 new_start: int 

977 new_count: int 

978 lines: list[bytes] 

979 

980 

981@dataclass 

982class FilePatch: 

983 """Represents a patch for a single file. 

984 

985 Attributes: 

986 old_path: Path to old file (None for new files) 

987 new_path: Path to new file (None for deleted files) 

988 old_mode: Mode of old file (None for new files) 

989 new_mode: Mode of new file (None for deleted files) 

990 hunks: List of PatchHunk objects 

991 binary: True if this is a binary patch 

992 rename_from: Original path for renames (None if not a rename) 

993 rename_to: New path for renames (None if not a rename) 

994 copy_from: Source path for copies (None if not a copy) 

995 copy_to: Destination path for copies (None if not a copy) 

996 binary_old: Old binary content for binary patches (base85 encoded) 

997 binary_new: New binary content for binary patches (base85 encoded) 

998 """ 

999 

1000 old_path: bytes | None 

1001 new_path: bytes | None 

1002 old_mode: int | None 

1003 new_mode: int | None 

1004 hunks: list[PatchHunk] 

1005 binary: bool = False 

1006 rename_from: bytes | None = None 

1007 rename_to: bytes | None = None 

1008 copy_from: bytes | None = None 

1009 copy_to: bytes | None = None 

1010 binary_old: bytes | None = None 

1011 binary_new: bytes | None = None 

1012 

1013 

1014def parse_unified_diff(diff_text: bytes) -> list[FilePatch]: 

1015 """Parse a unified diff into FilePatch objects. 

1016 

1017 Args: 

1018 diff_text: Unified diff content as bytes 

1019 

1020 Returns: 

1021 List of FilePatch objects 

1022 """ 

1023 patches: list[FilePatch] = [] 

1024 lines = diff_text.split(b"\n") 

1025 i = 0 

1026 

1027 while i < len(lines): 

1028 line = lines[i] 

1029 

1030 # Look for diff header 

1031 if line.startswith(b"diff --git "): 

1032 # Parse file patch 

1033 old_path = None 

1034 new_path = None 

1035 old_mode = None 

1036 new_mode = None 

1037 hunks: list[PatchHunk] = [] 

1038 binary = False 

1039 rename_from = None 

1040 rename_to = None 

1041 copy_from = None 

1042 copy_to = None 

1043 binary_old = None 

1044 binary_new = None 

1045 

1046 # Parse extended headers 

1047 i += 1 

1048 while i < len(lines): 

1049 line = lines[i] 

1050 

1051 if line.startswith(b"old file mode "): 

1052 old_mode = int(line.split()[-1], 8) 

1053 i += 1 

1054 elif line.startswith(b"new file mode "): 

1055 new_mode = int(line.split()[-1], 8) 

1056 i += 1 

1057 elif line.startswith(b"deleted file mode "): 

1058 old_mode = int(line.split()[-1], 8) 

1059 i += 1 

1060 elif line.startswith(b"new mode "): 

1061 new_mode = int(line.split()[-1], 8) 

1062 i += 1 

1063 elif line.startswith(b"old mode "): 

1064 old_mode = int(line.split()[-1], 8) 

1065 i += 1 

1066 elif line.startswith(b"rename from "): 

1067 rename_from = line[12:].strip() 

1068 i += 1 

1069 elif line.startswith(b"rename to "): 

1070 rename_to = line[10:].strip() 

1071 i += 1 

1072 elif line.startswith(b"copy from "): 

1073 copy_from = line[10:].strip() 

1074 i += 1 

1075 elif line.startswith(b"copy to "): 

1076 copy_to = line[8:].strip() 

1077 i += 1 

1078 elif line.startswith(b"similarity index "): 

1079 # Just skip similarity index for now 

1080 i += 1 

1081 elif line.startswith(b"dissimilarity index "): 

1082 # Just skip dissimilarity index for now 

1083 i += 1 

1084 elif line.startswith(b"index "): 

1085 i += 1 

1086 elif line.startswith(b"--- "): 

1087 # Parse old file path 

1088 path = line[4:].split(b"\t")[0] 

1089 if path != b"/dev/null": 

1090 old_path = path 

1091 i += 1 

1092 elif line.startswith(b"+++ "): 

1093 # Parse new file path 

1094 path = line[4:].split(b"\t")[0] 

1095 if path != b"/dev/null": 

1096 new_path = path 

1097 i += 1 

1098 break 

1099 elif line.startswith(b"Binary files"): 

1100 binary = True 

1101 i += 1 

1102 break 

1103 elif line.startswith(b"GIT binary patch"): 

1104 binary = True 

1105 i += 1 

1106 # Parse binary patch data 

1107 while i < len(lines): 

1108 line = lines[i] 

1109 if line.startswith(b"literal "): 

1110 # New binary data 

1111 # size = int(line[8:].strip()) # Size information, not currently used 

1112 i += 1 

1113 binary_data = b"" 

1114 while i < len(lines): 

1115 line = lines[i] 

1116 if ( 

1117 line.startswith( 

1118 (b"literal ", b"delta ", b"diff --git ") 

1119 ) 

1120 or not line.strip() 

1121 ): 

1122 break 

1123 binary_data += line + b"\n" 

1124 i += 1 

1125 binary_new = binary_data 

1126 elif line.startswith(b"delta "): 

1127 # Delta patch (not supported yet) 

1128 i += 1 

1129 while i < len(lines): 

1130 line = lines[i] 

1131 if ( 

1132 line.startswith( 

1133 (b"literal ", b"delta ", b"diff --git ") 

1134 ) 

1135 or not line.strip() 

1136 ): 

1137 break 

1138 i += 1 

1139 else: 

1140 break 

1141 break 

1142 else: 

1143 i += 1 

1144 break 

1145 

1146 # Parse hunks 

1147 if not binary: 

1148 while i < len(lines): 

1149 line = lines[i] 

1150 

1151 if line.startswith(b"@@ "): 

1152 # Parse hunk header 

1153 match = re.match( 

1154 rb"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line 

1155 ) 

1156 if match: 

1157 old_start = int(match.group(1)) 

1158 old_count = int(match.group(2)) if match.group(2) else 1 

1159 new_start = int(match.group(3)) 

1160 new_count = int(match.group(4)) if match.group(4) else 1 

1161 

1162 # Parse hunk lines 

1163 hunk_lines: list[bytes] = [] 

1164 i += 1 

1165 while i < len(lines): 

1166 line = lines[i] 

1167 if line.startswith((b" ", b"+", b"-", b"\\")): 

1168 hunk_lines.append(line) 

1169 i += 1 

1170 else: 

1171 break 

1172 

1173 hunks.append( 

1174 PatchHunk( 

1175 old_start=old_start, 

1176 old_count=old_count, 

1177 new_start=new_start, 

1178 new_count=new_count, 

1179 lines=hunk_lines, 

1180 ) 

1181 ) 

1182 else: 

1183 i += 1 

1184 elif line.startswith(b"diff --git "): 

1185 # Next file patch 

1186 break 

1187 else: 

1188 i += 1 

1189 if not line.strip(): 

1190 # Empty line, might be end of patch or separator 

1191 break 

1192 

1193 patches.append( 

1194 FilePatch( 

1195 old_path=old_path, 

1196 new_path=new_path, 

1197 old_mode=old_mode, 

1198 new_mode=new_mode, 

1199 hunks=hunks, 

1200 binary=binary, 

1201 rename_from=rename_from, 

1202 rename_to=rename_to, 

1203 copy_from=copy_from, 

1204 copy_to=copy_to, 

1205 binary_old=binary_old, 

1206 binary_new=binary_new, 

1207 ) 

1208 ) 

1209 else: 

1210 i += 1 

1211 

1212 return patches 

1213 

1214 

1215def apply_patch_hunks( 

1216 patch: FilePatch, 

1217 original_lines: list[bytes], 

1218) -> list[bytes] | None: 

1219 """Apply patch hunks to file content. 

1220 

1221 Args: 

1222 patch: FilePatch object to apply 

1223 original_lines: Original file content as list of lines 

1224 

1225 Returns: 

1226 Patched file content as list of lines, or None if patch cannot be applied 

1227 """ 

1228 result = original_lines[:] 

1229 offset = 0 # Track line offset as we apply hunks 

1230 

1231 for hunk in patch.hunks: 

1232 # Adjust hunk position by offset 

1233 # old_start is 1-indexed; 0 means the hunk inserts at the beginning 

1234 target_line = max(hunk.old_start - 1, 0) + offset 

1235 

1236 # Extract old and new content from hunk 

1237 old_content: list[bytes] = [] 

1238 new_content: list[bytes] = [] 

1239 

1240 for line in hunk.lines: 

1241 if line.startswith(b"\\"): 

1242 # Skip "\ No newline at end of file" markers 

1243 continue 

1244 elif line.startswith(b" "): 

1245 # Context line - add newline if not present 

1246 content = line[1:] 

1247 if not content.endswith(b"\n"): 

1248 content += b"\n" 

1249 old_content.append(content) 

1250 new_content.append(content) 

1251 elif line.startswith(b"-"): 

1252 # Deletion - add newline if not present 

1253 content = line[1:] 

1254 if not content.endswith(b"\n"): 

1255 content += b"\n" 

1256 old_content.append(content) 

1257 elif line.startswith(b"+"): 

1258 # Addition - add newline if not present 

1259 content = line[1:] 

1260 if not content.endswith(b"\n"): 

1261 content += b"\n" 

1262 new_content.append(content) 

1263 

1264 # Verify context matches 

1265 if target_line < 0 or target_line + len(old_content) > len(result): 

1266 # TODO: Implement fuzzy matching 

1267 return None 

1268 

1269 for i, old_line in enumerate(old_content): 

1270 if result[target_line + i] != old_line: 

1271 # Context doesn't match 

1272 # TODO: Implement fuzzy matching 

1273 return None 

1274 

1275 # Apply the patch 

1276 result[target_line : target_line + len(old_content)] = new_content 

1277 

1278 # Update offset for next hunk 

1279 offset += len(new_content) - len(old_content) 

1280 

1281 return result 

1282 

1283 

1284def _apply_rename_or_copy( 

1285 r: "Repo", 

1286 src_path: bytes, 

1287 dst_path: bytes, 

1288 strip: int, 

1289 patch: FilePatch, 

1290 is_rename: bool, 

1291 cached: bool, 

1292 check: bool, 

1293) -> tuple[list[bytes] | None, bool]: 

1294 """Apply a rename or copy operation. 

1295 

1296 Args: 

1297 r: Repository object 

1298 src_path: Source path 

1299 dst_path: Destination path 

1300 strip: Number of path components to strip 

1301 patch: FilePatch object 

1302 is_rename: True for rename, False for copy 

1303 cached: Apply to index only, not working tree 

1304 check: Check only, don't apply 

1305 

1306 Returns: 

1307 A tuple of (``original_lines``, ``should_continue``) where: 

1308 - ``original_lines``: Content lines if hunks need to be applied, None otherwise 

1309 - ``should_continue``: True to skip to next patch, False to continue processing 

1310 """ 

1311 from .index import ConflictedIndexEntry, IndexEntry, index_entry_from_stat 

1312 

1313 # Strip path components 

1314 src_stripped = src_path 

1315 dst_stripped = dst_path 

1316 if strip > 0: 

1317 src_parts = src_path.split(b"/") 

1318 if len(src_parts) > strip: 

1319 src_stripped = b"/".join(src_parts[strip:]) 

1320 dst_parts = dst_path.split(b"/") 

1321 if len(dst_parts) > strip: 

1322 dst_stripped = b"/".join(dst_parts[strip:]) 

1323 

1324 repo_path_bytes = r.path.encode("utf-8") if isinstance(r.path, str) else r.path 

1325 src_fs_path = os.path.join(repo_path_bytes, src_stripped) 

1326 dst_fs_path = os.path.join(repo_path_bytes, dst_stripped) 

1327 

1328 # Read content from source file 

1329 op_name = "rename" if is_rename else "copy" 

1330 if os.path.exists(src_fs_path): 

1331 with open(src_fs_path, "rb") as f: 

1332 content = f.read() 

1333 else: 

1334 # Try to read from index 

1335 index = r.open_index() 

1336 if src_stripped in index: 

1337 entry = index[src_stripped] 

1338 if not isinstance(entry, ConflictedIndexEntry): 

1339 obj = r.object_store[entry.sha] 

1340 if isinstance(obj, Blob): 

1341 content = obj.data 

1342 else: 

1343 raise ValueError( 

1344 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} not found" 

1345 ) 

1346 else: 

1347 raise ValueError( 

1348 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} is conflicted" 

1349 ) 

1350 else: 

1351 raise ValueError( 

1352 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} not found" 

1353 ) 

1354 

1355 # If there are hunks, return content as lines for further processing 

1356 if patch.hunks: 

1357 return content.splitlines(keepends=True), False 

1358 

1359 # No hunks - pure rename/copy 

1360 if check: 

1361 return None, True 

1362 

1363 # Write to destination 

1364 if not cached: 

1365 os.makedirs(os.path.dirname(dst_fs_path), exist_ok=True) 

1366 with open(dst_fs_path, "wb") as f: 

1367 f.write(content) 

1368 if patch.new_mode is not None: 

1369 os.chmod(dst_fs_path, patch.new_mode) 

1370 

1371 # Update index 

1372 index = r.open_index() 

1373 blob = Blob.from_string(content) 

1374 r.object_store.add_object(blob) 

1375 

1376 if not cached and os.path.exists(dst_fs_path): 

1377 st = os.stat(dst_fs_path) 

1378 entry = index_entry_from_stat(st, blob.id, 0) 

1379 else: 

1380 entry = IndexEntry( 

1381 ctime=(0, 0), 

1382 mtime=(0, 0), 

1383 dev=0, 

1384 ino=0, 

1385 mode=patch.new_mode or 0o100644, 

1386 uid=0, 

1387 gid=0, 

1388 size=len(content), 

1389 sha=blob.id, 

1390 flags=0, 

1391 ) 

1392 

1393 index[dst_stripped] = entry 

1394 

1395 # For renames, remove the old file 

1396 if is_rename: 

1397 if not cached and os.path.exists(src_fs_path): 

1398 os.remove(src_fs_path) 

1399 if src_stripped in index: 

1400 del index[src_stripped] 

1401 

1402 index.write() 

1403 return None, True 

1404 

1405 

1406def apply_patches( 

1407 r: "Repo", 

1408 patches: list[FilePatch], 

1409 cached: bool = False, 

1410 reverse: bool = False, 

1411 check: bool = False, 

1412 strip: int = 1, 

1413 three_way: bool = False, 

1414) -> None: 

1415 """Apply a list of file patches to a repository. 

1416 

1417 Args: 

1418 r: Repository object 

1419 patches: List of FilePatch objects to apply 

1420 cached: Apply patch to index only, not working tree 

1421 reverse: Apply patch in reverse 

1422 check: Only check if patch can be applied, don't apply 

1423 strip: Number of leading path components to strip (default: 1) 

1424 three_way: Fall back to 3-way merge if patch does not apply cleanly 

1425 

1426 Raises: 

1427 ValueError: If patch cannot be applied 

1428 """ 

1429 from .index import ConflictedIndexEntry, IndexEntry, index_entry_from_stat 

1430 

1431 for patch in patches: 

1432 # Determine the file path 

1433 # For renames/copies without hunks, old_path/new_path may be None 

1434 # Use local variables to avoid mutating the patch object 

1435 old_path = patch.old_path 

1436 new_path = patch.new_path 

1437 

1438 if new_path is None and old_path is None: 

1439 if patch.rename_to is not None: 

1440 # Use rename_to for the target path 

1441 new_path = patch.rename_to 

1442 old_path = patch.rename_from 

1443 elif patch.copy_to is not None: 

1444 # Use copy_to for the target path 

1445 new_path = patch.copy_to 

1446 old_path = patch.copy_from 

1447 else: 

1448 raise ValueError("Patch has no file path") 

1449 

1450 # Choose path based on operation 

1451 file_path: bytes 

1452 if new_path is None: 

1453 # Deletion 

1454 if old_path is None: 

1455 raise ValueError("Patch has no file path") 

1456 file_path = old_path 

1457 elif old_path is None: 

1458 # Addition 

1459 file_path = new_path 

1460 else: 

1461 # Modification (use new path) 

1462 file_path = new_path 

1463 

1464 # Strip path components 

1465 if strip > 0: 

1466 parts = file_path.split(b"/") 

1467 if len(parts) > strip: 

1468 file_path = b"/".join(parts[strip:]) 

1469 

1470 # Convert to filesystem path 

1471 tree_path = file_path 

1472 fs_path = os.path.join( 

1473 r.path.encode("utf-8") if isinstance(r.path, str) else r.path, file_path 

1474 ) 

1475 

1476 # Handle renames and copies 

1477 original_lines: list[bytes] | None = None 

1478 if patch.rename_from is not None and patch.rename_to is not None: 

1479 original_lines, should_continue = _apply_rename_or_copy( 

1480 r, 

1481 patch.rename_from, 

1482 patch.rename_to, 

1483 strip, 

1484 patch, 

1485 is_rename=True, 

1486 cached=cached, 

1487 check=check, 

1488 ) 

1489 if should_continue: 

1490 continue 

1491 elif patch.copy_from is not None and patch.copy_to is not None: 

1492 original_lines, should_continue = _apply_rename_or_copy( 

1493 r, 

1494 patch.copy_from, 

1495 patch.copy_to, 

1496 strip, 

1497 patch, 

1498 is_rename=False, 

1499 cached=cached, 

1500 check=check, 

1501 ) 

1502 if should_continue: 

1503 continue 

1504 

1505 # Handle binary patches 

1506 if patch.binary: 

1507 if patch.binary_new is not None: 

1508 # Decode binary patch 

1509 try: 

1510 binary_content = git_base85_decode(patch.binary_new) 

1511 except (ValueError, KeyError) as e: 

1512 raise ValueError(f"Failed to decode binary patch: {e}") 

1513 

1514 if check: 

1515 # Just checking, don't actually apply 

1516 continue 

1517 

1518 # Write binary file 

1519 if not cached: 

1520 os.makedirs(os.path.dirname(fs_path), exist_ok=True) 

1521 with open(fs_path, "wb") as f: 

1522 f.write(binary_content) 

1523 if patch.new_mode is not None: 

1524 os.chmod(fs_path, patch.new_mode) 

1525 

1526 # Update index 

1527 index = r.open_index() 

1528 blob = Blob.from_string(binary_content) 

1529 r.object_store.add_object(blob) 

1530 

1531 if not cached and os.path.exists(fs_path): 

1532 st = os.stat(fs_path) 

1533 entry = index_entry_from_stat(st, blob.id, 0) 

1534 else: 

1535 entry = IndexEntry( 

1536 ctime=(0, 0), 

1537 mtime=(0, 0), 

1538 dev=0, 

1539 ino=0, 

1540 mode=patch.new_mode or 0o100644, 

1541 uid=0, 

1542 gid=0, 

1543 size=len(binary_content), 

1544 sha=blob.id, 

1545 flags=0, 

1546 ) 

1547 

1548 index[tree_path] = entry 

1549 index.write() 

1550 continue 

1551 else: 

1552 # Old-style "Binary files differ" message without actual patch data 

1553 raise NotImplementedError( 

1554 "Binary patch detected but no patch data provided (use git diff --binary)" 

1555 ) 

1556 

1557 # Read original file content (unless already loaded from rename/copy) 

1558 if original_lines is None: 

1559 if patch.old_path is None: 

1560 # New file 

1561 original_lines = [] 

1562 else: 

1563 if os.path.exists(fs_path): 

1564 with open(fs_path, "rb") as f: 

1565 content = f.read() 

1566 original_lines = content.splitlines(keepends=True) 

1567 else: 

1568 # File doesn't exist - check if it's in the index 

1569 try: 

1570 index = r.open_index() 

1571 if tree_path in index: 

1572 index_entry: IndexEntry | ConflictedIndexEntry = index[ 

1573 tree_path 

1574 ] 

1575 if not isinstance(index_entry, ConflictedIndexEntry): 

1576 obj = r.object_store[index_entry.sha] 

1577 if isinstance(obj, Blob): 

1578 original_lines = obj.data.splitlines(keepends=True) 

1579 else: 

1580 original_lines = [] 

1581 else: 

1582 original_lines = [] 

1583 else: 

1584 original_lines = [] 

1585 except (KeyError, FileNotFoundError): 

1586 original_lines = [] 

1587 

1588 # Reverse patch if requested 

1589 if reverse: 

1590 # Swap old and new in hunks 

1591 for hunk in patch.hunks: 

1592 hunk.old_start, hunk.new_start = hunk.new_start, hunk.old_start 

1593 hunk.old_count, hunk.new_count = hunk.new_count, hunk.old_count 

1594 # Swap +/- prefixes 

1595 reversed_lines = [] 

1596 for line in hunk.lines: 

1597 if line.startswith(b"+"): 

1598 reversed_lines.append(b"-" + line[1:]) 

1599 elif line.startswith(b"-"): 

1600 reversed_lines.append(b"+" + line[1:]) 

1601 else: 

1602 reversed_lines.append(line) 

1603 hunk.lines = reversed_lines 

1604 

1605 # Apply the patch 

1606 assert original_lines is not None 

1607 result = apply_patch_hunks(patch, original_lines) 

1608 

1609 if result is None and three_way: 

1610 # Try 3-way merge fallback 

1611 from .merge import merge_blobs 

1612 

1613 # Reconstruct base version from the patch 

1614 # Base is what you get by taking only the old lines from hunks 

1615 base_lines = [] 

1616 theirs_lines = [] 

1617 

1618 for hunk in patch.hunks: 

1619 for line in hunk.lines: 

1620 if line.startswith(b"\\"): 

1621 # Skip "\ No newline at end of file" markers 

1622 continue 

1623 elif line.startswith(b" "): 

1624 # Context line - in both base and theirs 

1625 content = line[1:] 

1626 if not content.endswith(b"\n"): 

1627 content += b"\n" 

1628 base_lines.append(content) 

1629 theirs_lines.append(content) 

1630 elif line.startswith(b"-"): 

1631 # Deletion - only in base 

1632 content = line[1:] 

1633 if not content.endswith(b"\n"): 

1634 content += b"\n" 

1635 base_lines.append(content) 

1636 elif line.startswith(b"+"): 

1637 # Addition - only in theirs 

1638 content = line[1:] 

1639 if not content.endswith(b"\n"): 

1640 content += b"\n" 

1641 theirs_lines.append(content) 

1642 

1643 # Create blobs for merging 

1644 base_content = b"".join(base_lines) 

1645 ours_content = b"".join(original_lines) 

1646 theirs_content = b"".join(theirs_lines) 

1647 

1648 base_blob = Blob.from_string(base_content) if base_content else None 

1649 ours_blob = Blob.from_string(ours_content) if ours_content else None 

1650 theirs_blob = Blob.from_string(theirs_content) 

1651 

1652 # Perform 3-way merge 

1653 merged_content, _had_conflicts = merge_blobs( 

1654 base_blob, ours_blob, theirs_blob, path=tree_path 

1655 ) 

1656 

1657 result = merged_content.splitlines(keepends=True) 

1658 

1659 # Note: if _had_conflicts is True, the result contains conflict markers 

1660 # Git would exit with error code, but we continue processing 

1661 elif result is None: 

1662 raise PatchApplicationFailure( 

1663 f"Patch does not apply to {file_path.decode('utf-8', errors='replace')}" 

1664 ) 

1665 

1666 if check: 

1667 # Just checking, don't actually apply 

1668 continue 

1669 

1670 # Write result 

1671 result_content = b"".join(result) 

1672 

1673 if patch.new_path is None: 

1674 # File deletion 

1675 if not cached and os.path.exists(fs_path): 

1676 os.remove(fs_path) 

1677 # Remove from index 

1678 index = r.open_index() 

1679 if tree_path in index: 

1680 del index[tree_path] 

1681 index.write() 

1682 else: 

1683 # File addition or modification 

1684 if not cached: 

1685 # Write to working tree 

1686 os.makedirs(os.path.dirname(fs_path), exist_ok=True) 

1687 with open(fs_path, "wb") as f: 

1688 f.write(result_content) 

1689 

1690 # Update file mode if specified 

1691 if patch.new_mode is not None: 

1692 os.chmod(fs_path, patch.new_mode) 

1693 

1694 # Update index 

1695 index = r.open_index() 

1696 blob = Blob.from_string(result_content) 

1697 r.object_store.add_object(blob) 

1698 

1699 # Get file stat for index entry 

1700 if not cached and os.path.exists(fs_path): 

1701 st = os.stat(fs_path) 

1702 entry = index_entry_from_stat(st, blob.id, 0) 

1703 else: 

1704 # Create a minimal index entry for cached-only changes 

1705 entry = IndexEntry( 

1706 ctime=(0, 0), 

1707 mtime=(0, 0), 

1708 dev=0, 

1709 ino=0, 

1710 mode=patch.new_mode or 0o100644, 

1711 uid=0, 

1712 gid=0, 

1713 size=len(result_content), 

1714 sha=blob.id, 

1715 flags=0, 

1716 ) 

1717 

1718 index[tree_path] = entry 

1719 

1720 # Handle cleanup for renames with hunks 

1721 if patch.rename_from is not None and patch.rename_to is not None: 

1722 # Remove old file after successful rename 

1723 old_rename_path = patch.rename_from 

1724 if strip > 0: 

1725 old_parts = old_rename_path.split(b"/") 

1726 if len(old_parts) > strip: 

1727 old_rename_path = b"/".join(old_parts[strip:]) 

1728 

1729 old_fs_path = os.path.join( 

1730 r.path.encode("utf-8") if isinstance(r.path, str) else r.path, 

1731 old_rename_path, 

1732 ) 

1733 

1734 if not cached and os.path.exists(old_fs_path): 

1735 os.remove(old_fs_path) 

1736 if old_rename_path in index: 

1737 del index[old_rename_path] 

1738 

1739 index.write() 

1740 

1741 

1742def mailinfo( 

1743 msg: email.message.Message | BinaryIO | TextIO, 

1744 keep_subject: bool = False, 

1745 keep_non_patch: bool = False, 

1746 encoding: str | None = None, 

1747 scissors: bool = False, 

1748 message_id: bool = False, 

1749) -> MailinfoResult: 

1750 """Extract patch information from an email message. 

1751 

1752 This function parses an email message and extracts commit metadata 

1753 (author, email, subject) and separates the commit message from the 

1754 patch content, similar to git mailinfo. 

1755 

1756 Args: 

1757 msg: Email message (email.message.Message object) or file handle to read from 

1758 keep_subject: If True, keep subject intact without munging (-k) 

1759 keep_non_patch: If True, only strip [PATCH] from brackets (-b) 

1760 encoding: Character encoding to use (default: detect from message) 

1761 scissors: If True, remove everything before scissors line 

1762 message_id: If True, include Message-ID in commit message (-m) 

1763 

1764 Returns: 

1765 MailinfoResult with parsed information 

1766 

1767 Raises: 

1768 ValueError: If message is malformed or missing required fields 

1769 """ 

1770 # Parse message if given a file handle 

1771 parsed_msg: email.message.Message 

1772 if not isinstance(msg, email.message.Message): 

1773 if hasattr(msg, "read"): 

1774 content = msg.read() 

1775 if isinstance(content, bytes): 

1776 bparser = email.parser.BytesParser() 

1777 parsed_msg = bparser.parsebytes(content) 

1778 else: 

1779 sparser = email.parser.Parser() 

1780 parsed_msg = sparser.parsestr(content) 

1781 else: 

1782 raise ValueError("msg must be an email.message.Message or file-like object") 

1783 else: 

1784 parsed_msg = msg 

1785 

1786 # Detect encoding from message if not specified 

1787 if encoding is None: 

1788 encoding = parsed_msg.get_content_charset() or "utf-8" 

1789 

1790 # Extract author information 

1791 from_header = parsed_msg.get("From", "") 

1792 if not from_header: 

1793 raise ValueError("Email message missing 'From' header") 

1794 

1795 # Parse "Name <email>" format 

1796 author_name, author_email = email.utils.parseaddr(from_header) 

1797 if not author_email: 

1798 raise ValueError( 

1799 f"Could not parse email address from 'From' header: {from_header}" 

1800 ) 

1801 

1802 # Extract date 

1803 date_header = parsed_msg.get("Date") 

1804 author_date = date_header if date_header else None 

1805 

1806 # Extract and process subject 

1807 subject = parsed_msg.get("Subject", "") 

1808 if not subject: 

1809 subject = "(no subject)" 

1810 

1811 # Convert Header object to string if needed 

1812 subject = str(subject) 

1813 

1814 # Remove newlines from subject 

1815 subject = subject.replace("\n", " ").replace("\r", " ") 

1816 subject = _munge_subject(subject, keep_subject, keep_non_patch) 

1817 

1818 # Extract Message-ID if requested 

1819 msg_id = None 

1820 if message_id: 

1821 msg_id = parsed_msg.get("Message-ID") 

1822 

1823 # Get message body 

1824 body = parsed_msg.get_payload(decode=True) 

1825 if body is None: 

1826 body = b"" 

1827 elif isinstance(body, str): 

1828 body = body.encode(encoding) 

1829 elif not isinstance(body, bytes): 

1830 # Handle multipart or other types 

1831 body = str(body).encode(encoding) 

1832 

1833 # Split into lines 

1834 lines = body.splitlines(keepends=True) 

1835 

1836 # Handle scissors 

1837 scissors_idx = None 

1838 if scissors: 

1839 scissors_idx = _find_scissors_line(lines) 

1840 if scissors_idx is not None: 

1841 # Remove everything up to and including scissors line 

1842 lines = lines[scissors_idx + 1 :] 

1843 

1844 # Separate commit message from patch 

1845 # Look for the "---" separator that indicates start of diffstat/patch 

1846 message_lines: list[bytes] = [] 

1847 patch_lines: list[bytes] = [] 

1848 in_patch = False 

1849 

1850 for line in lines: 

1851 if not in_patch and line == b"---\n": 

1852 in_patch = True 

1853 patch_lines.append(line) 

1854 elif in_patch: 

1855 # Stop at signature marker "-- " 

1856 if line == b"-- \n": 

1857 break 

1858 patch_lines.append(line) 

1859 else: 

1860 message_lines.append(line) 

1861 

1862 # Build commit message 

1863 commit_message = b"".join(message_lines).decode(encoding, errors="replace") 

1864 

1865 # Clean up commit message 

1866 commit_message = commit_message.strip() 

1867 

1868 # Append Message-ID if requested 

1869 if message_id and msg_id: 

1870 if commit_message: 

1871 commit_message += "\n\n" 

1872 commit_message += f"Message-ID: {msg_id}" 

1873 

1874 # Build patch content 

1875 patch_content = b"".join(patch_lines).decode(encoding, errors="replace") 

1876 

1877 return MailinfoResult( 

1878 author_name=author_name, 

1879 author_email=author_email, 

1880 author_date=author_date, 

1881 subject=subject, 

1882 message=commit_message, 

1883 patch=patch_content, 

1884 message_id=msg_id, 

1885 )