Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 9%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# patch.py -- For dealing with packed-style patches.
2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Classes for dealing with git am-style patches.
24These patches are basically unified diffs with some extra metadata tacked
25on.
26"""
28__all__ = [
29 "DEFAULT_DIFF_ALGORITHM",
30 "FIRST_FEW_BYTES",
31 "DiffAlgorithmNotAvailable",
32 "MailinfoResult",
33 "PatchApplicationFailure",
34 "apply_patch_hunks",
35 "apply_patches",
36 "commit_patch_id",
37 "gen_diff_header",
38 "get_summary",
39 "git_am_patch_split",
40 "is_binary",
41 "mailinfo",
42 "parse_patch_message",
43 "patch_filename",
44 "patch_id",
45 "shortid",
46 "unified_diff",
47 "unified_diff_with_algorithm",
48 "write_blob_diff",
49 "write_commit_patch",
50 "write_object_diff",
51 "write_tree_diff",
52]
54import email.message
55import email.parser
56import email.utils
57import os
58import re
59import time
60from collections.abc import Generator, Sequence
61from dataclasses import dataclass
62from difflib import SequenceMatcher
63from typing import (
64 IO,
65 TYPE_CHECKING,
66 BinaryIO,
67 TextIO,
68)
70if TYPE_CHECKING:
71 from .object_store import BaseObjectStore
72 from .repo import Repo
74from .objects import S_ISGITLINK, Blob, Commit, ObjectID, RawObjectID
76FIRST_FEW_BYTES = 8000
78DEFAULT_DIFF_ALGORITHM = "myers"
81class PatchApplicationFailure(Exception):
82 """Raised when a patch does not apply cleanly."""
85class DiffAlgorithmNotAvailable(Exception):
86 """Raised when a requested diff algorithm is not available."""
88 def __init__(self, algorithm: str, install_hint: str = "") -> None:
89 """Initialize exception.
91 Args:
92 algorithm: Name of the unavailable algorithm
93 install_hint: Optional installation hint
94 """
95 self.algorithm = algorithm
96 self.install_hint = install_hint
97 if install_hint:
98 super().__init__(
99 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}"
100 )
101 else:
102 super().__init__(
103 f"Diff algorithm '{algorithm}' requested but not available."
104 )
107def write_commit_patch(
108 f: IO[bytes],
109 commit: "Commit",
110 contents: str | bytes,
111 progress: tuple[int, int],
112 version: str | None = None,
113 encoding: str | None = None,
114) -> None:
115 """Write a individual file patch.
117 Args:
118 f: File-like object to write to
119 commit: Commit object
120 contents: Contents of the patch
121 progress: tuple with current patch number and total.
122 version: Version string to include in patch header
123 encoding: Encoding to use for the patch
125 Returns:
126 tuple with filename and contents
127 """
128 encoding = encoding or getattr(f, "encoding", "ascii")
129 if encoding is None:
130 encoding = "ascii"
131 if isinstance(contents, str):
132 contents = contents.encode(encoding)
133 (num, total) = progress
134 f.write(
135 b"From "
136 + commit.id
137 + b" "
138 + time.ctime(commit.commit_time).encode(encoding)
139 + b"\n"
140 )
141 f.write(b"From: " + commit.author + b"\n")
142 f.write(
143 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n"
144 )
145 f.write(
146 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n"
147 )
148 f.write(b"\n")
149 f.write(b"---\n")
150 try:
151 import subprocess
153 p = subprocess.Popen(
154 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE
155 )
156 except (ImportError, OSError):
157 pass # diffstat not available?
158 else:
159 (diffstat, _) = p.communicate(contents)
160 f.write(diffstat)
161 f.write(b"\n")
162 f.write(contents)
163 f.write(b"-- \n")
164 if version is None:
165 from dulwich import __version__ as dulwich_version
167 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version)
168 else:
169 if encoding is None:
170 encoding = "ascii"
171 f.write(version.encode(encoding) + b"\n")
174def get_summary(commit: "Commit") -> str:
175 """Determine the summary line for use in a filename.
177 Args:
178 commit: Commit
179 Returns: Summary string
180 """
181 decoded = commit.message.decode(errors="replace")
182 lines = decoded.splitlines()
183 return lines[0].replace(" ", "-") if lines else ""
186# Unified Diff
187def _format_range_unified(start: int, stop: int) -> str:
188 """Convert range to the "ed" format."""
189 # Per the diff spec at http://www.unix.org/single_unix_specification/
190 beginning = start + 1 # lines start numbering with one
191 length = stop - start
192 if length == 1:
193 return f"{beginning}"
194 if not length:
195 beginning -= 1 # empty ranges begin at line just before the range
196 return f"{beginning},{length}"
199def unified_diff(
200 a: Sequence[bytes],
201 b: Sequence[bytes],
202 fromfile: bytes = b"",
203 tofile: bytes = b"",
204 fromfiledate: str = "",
205 tofiledate: str = "",
206 n: int = 3,
207 lineterm: str = "\n",
208 tree_encoding: str = "utf-8",
209 output_encoding: str = "utf-8",
210) -> Generator[bytes, None, None]:
211 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does.
213 Based on the same function in Python2.7 difflib.py
214 """
215 started = False
216 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n):
217 if not started:
218 started = True
219 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
220 todate = f"\t{tofiledate}" if tofiledate else ""
221 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
222 output_encoding
223 )
224 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
225 output_encoding
226 )
228 first, last = group[0], group[-1]
229 file1_range = _format_range_unified(first[1], last[2])
230 file2_range = _format_range_unified(first[3], last[4])
231 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
233 for tag, i1, i2, j1, j2 in group:
234 if tag == "equal":
235 for line in a[i1:i2]:
236 yield b" " + line
237 continue
238 if tag in ("replace", "delete"):
239 for line in a[i1:i2]:
240 if not line[-1:] == b"\n":
241 line += b"\n\\ No newline at end of file\n"
242 yield b"-" + line
243 if tag in ("replace", "insert"):
244 for line in b[j1:j2]:
245 if not line[-1:] == b"\n":
246 line += b"\n\\ No newline at end of file\n"
247 yield b"+" + line
250def _get_sequence_matcher(
251 algorithm: str, a: Sequence[bytes], b: Sequence[bytes]
252) -> SequenceMatcher[bytes]:
253 """Get appropriate sequence matcher for the given algorithm.
255 Args:
256 algorithm: Diff algorithm ("myers" or "patience")
257 a: First sequence
258 b: Second sequence
260 Returns:
261 Configured sequence matcher instance
263 Raises:
264 DiffAlgorithmNotAvailable: If patience requested but not available
265 """
266 if algorithm == "patience":
267 try:
268 from patiencediff import PatienceSequenceMatcher
270 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore]
271 except ImportError:
272 raise DiffAlgorithmNotAvailable(
273 "patience", "Install with: pip install 'dulwich[patiencediff]'"
274 )
275 else:
276 return SequenceMatcher(a=a, b=b)
279def unified_diff_with_algorithm(
280 a: Sequence[bytes],
281 b: Sequence[bytes],
282 fromfile: bytes = b"",
283 tofile: bytes = b"",
284 fromfiledate: str = "",
285 tofiledate: str = "",
286 n: int = 3,
287 lineterm: str = "\n",
288 tree_encoding: str = "utf-8",
289 output_encoding: str = "utf-8",
290 algorithm: str | None = None,
291) -> Generator[bytes, None, None]:
292 """Generate unified diff with specified algorithm.
294 Args:
295 a: First sequence of lines
296 b: Second sequence of lines
297 fromfile: Name of first file
298 tofile: Name of second file
299 fromfiledate: Date of first file
300 tofiledate: Date of second file
301 n: Number of context lines
302 lineterm: Line terminator
303 tree_encoding: Encoding for tree paths
304 output_encoding: Encoding for output
305 algorithm: Diff algorithm to use ("myers" or "patience")
307 Returns:
308 Generator yielding diff lines
310 Raises:
311 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available
312 """
313 if algorithm is None:
314 algorithm = DEFAULT_DIFF_ALGORITHM
316 matcher = _get_sequence_matcher(algorithm, a, b)
318 started = False
319 for group in matcher.get_grouped_opcodes(n):
320 if not started:
321 started = True
322 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
323 todate = f"\t{tofiledate}" if tofiledate else ""
324 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
325 output_encoding
326 )
327 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
328 output_encoding
329 )
331 first, last = group[0], group[-1]
332 file1_range = _format_range_unified(first[1], last[2])
333 file2_range = _format_range_unified(first[3], last[4])
334 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
336 for tag, i1, i2, j1, j2 in group:
337 if tag == "equal":
338 for line in a[i1:i2]:
339 yield b" " + line
340 continue
341 if tag in ("replace", "delete"):
342 for line in a[i1:i2]:
343 if not line[-1:] == b"\n":
344 line += b"\n\\ No newline at end of file\n"
345 yield b"-" + line
346 if tag in ("replace", "insert"):
347 for line in b[j1:j2]:
348 if not line[-1:] == b"\n":
349 line += b"\n\\ No newline at end of file\n"
350 yield b"+" + line
353def is_binary(content: bytes) -> bool:
354 """See if the first few bytes contain any null characters.
356 Args:
357 content: Bytestring to check for binary content
358 """
359 return b"\0" in content[:FIRST_FEW_BYTES]
362def shortid(hexsha: bytes | None) -> bytes:
363 """Get short object ID.
365 Args:
366 hexsha: Full hex SHA or None
368 Returns:
369 7-character short ID
370 """
371 if hexsha is None:
372 return b"0" * 7
373 else:
374 return hexsha[:7]
377def patch_filename(p: bytes | None, root: bytes) -> bytes:
378 """Generate patch filename.
380 Args:
381 p: Path or None
382 root: Root directory
384 Returns:
385 Full patch filename
386 """
387 if p is None:
388 return b"/dev/null"
389 else:
390 return root + b"/" + p
393def write_object_diff(
394 f: IO[bytes],
395 store: "BaseObjectStore",
396 old_file: tuple[bytes | None, int | None, ObjectID | None],
397 new_file: tuple[bytes | None, int | None, ObjectID | None],
398 diff_binary: bool = False,
399 diff_algorithm: str | None = None,
400) -> None:
401 """Write the diff for an object.
403 Args:
404 f: File-like object to write to
405 store: Store to retrieve objects from, if necessary
406 old_file: (path, mode, hexsha) tuple
407 new_file: (path, mode, hexsha) tuple
408 diff_binary: Whether to diff files even if they
409 are considered binary files by is_binary().
410 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
412 Note: the tuple elements should be None for nonexistent files
413 """
414 (old_path, old_mode, old_id) = old_file
415 (new_path, new_mode, new_id) = new_file
416 patched_old_path = patch_filename(old_path, b"a")
417 patched_new_path = patch_filename(new_path, b"b")
419 def content(mode: int | None, hexsha: ObjectID | None) -> Blob:
420 """Get blob content for a file.
422 Args:
423 mode: File mode
424 hexsha: Object SHA
426 Returns:
427 Blob object
428 """
429 if hexsha is None:
430 return Blob.from_string(b"")
431 elif mode is not None and S_ISGITLINK(mode):
432 return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
433 else:
434 obj = store[hexsha]
435 if isinstance(obj, Blob):
436 return obj
437 else:
438 # Fallback for non-blob objects
439 return Blob.from_string(obj.as_raw_string())
441 def lines(content: "Blob") -> list[bytes]:
442 """Split blob content into lines.
444 Args:
445 content: Blob content
447 Returns:
448 List of lines
449 """
450 if not content:
451 return []
452 else:
453 return content.splitlines()
455 f.writelines(
456 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id))
457 )
458 old_content = content(old_mode, old_id)
459 new_content = content(new_mode, new_id)
460 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)):
461 binary_diff = (
462 b"Binary files "
463 + patched_old_path
464 + b" and "
465 + patched_new_path
466 + b" differ\n"
467 )
468 f.write(binary_diff)
469 else:
470 f.writelines(
471 unified_diff_with_algorithm(
472 lines(old_content),
473 lines(new_content),
474 patched_old_path,
475 patched_new_path,
476 algorithm=diff_algorithm,
477 )
478 )
481# TODO(jelmer): Support writing unicode, rather than bytes.
482def gen_diff_header(
483 paths: tuple[bytes | None, bytes | None],
484 modes: tuple[int | None, int | None],
485 shas: tuple[bytes | None, bytes | None],
486) -> Generator[bytes, None, None]:
487 """Write a blob diff header.
489 Args:
490 paths: Tuple with old and new path
491 modes: Tuple with old and new modes
492 shas: Tuple with old and new shas
493 """
494 (old_path, new_path) = paths
495 (old_mode, new_mode) = modes
496 (old_sha, new_sha) = shas
497 if old_path is None and new_path is not None:
498 old_path = new_path
499 if new_path is None and old_path is not None:
500 new_path = old_path
501 old_path = patch_filename(old_path, b"a")
502 new_path = patch_filename(new_path, b"b")
503 yield b"diff --git " + old_path + b" " + new_path + b"\n"
505 if old_mode != new_mode:
506 if new_mode is not None:
507 if old_mode is not None:
508 yield (f"old file mode {old_mode:o}\n").encode("ascii")
509 yield (f"new file mode {new_mode:o}\n").encode("ascii")
510 else:
511 yield (f"deleted file mode {old_mode:o}\n").encode("ascii")
512 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha)
513 if new_mode is not None and old_mode is not None:
514 yield (f" {new_mode:o}").encode("ascii")
515 yield b"\n"
518# TODO(jelmer): Support writing unicode, rather than bytes.
519def write_blob_diff(
520 f: IO[bytes],
521 old_file: tuple[bytes | None, int | None, "Blob | None"],
522 new_file: tuple[bytes | None, int | None, "Blob | None"],
523 diff_algorithm: str | None = None,
524) -> None:
525 """Write blob diff.
527 Args:
528 f: File-like object to write to
529 old_file: (path, mode, hexsha) tuple (None if nonexisting)
530 new_file: (path, mode, hexsha) tuple (None if nonexisting)
531 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
533 Note: The use of write_object_diff is recommended over this function.
534 """
535 (old_path, old_mode, old_blob) = old_file
536 (new_path, new_mode, new_blob) = new_file
537 patched_old_path = patch_filename(old_path, b"a")
538 patched_new_path = patch_filename(new_path, b"b")
540 def lines(blob: "Blob | None") -> list[bytes]:
541 """Split blob content into lines.
543 Args:
544 blob: Blob object or None
546 Returns:
547 List of lines
548 """
549 if blob is not None:
550 return blob.splitlines()
551 else:
552 return []
554 f.writelines(
555 gen_diff_header(
556 (old_path, new_path),
557 (old_mode, new_mode),
558 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)),
559 )
560 )
561 old_contents = lines(old_blob)
562 new_contents = lines(new_blob)
563 f.writelines(
564 unified_diff_with_algorithm(
565 old_contents,
566 new_contents,
567 patched_old_path,
568 patched_new_path,
569 algorithm=diff_algorithm,
570 )
571 )
574def write_tree_diff(
575 f: IO[bytes],
576 store: "BaseObjectStore",
577 old_tree: ObjectID | None,
578 new_tree: ObjectID | None,
579 diff_binary: bool = False,
580 diff_algorithm: str | None = None,
581) -> None:
582 """Write tree diff.
584 Args:
585 f: File-like object to write to.
586 store: Object store to read from
587 old_tree: Old tree id
588 new_tree: New tree id
589 diff_binary: Whether to diff files even if they
590 are considered binary files by is_binary().
591 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
592 """
593 changes = store.tree_changes(old_tree, new_tree)
594 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
595 write_object_diff(
596 f,
597 store,
598 (oldpath, oldmode, oldsha),
599 (newpath, newmode, newsha),
600 diff_binary=diff_binary,
601 diff_algorithm=diff_algorithm,
602 )
605def git_am_patch_split(
606 f: TextIO | BinaryIO, encoding: str | None = None
607) -> tuple["Commit", bytes, bytes | None]:
608 """Parse a git-am-style patch and split it up into bits.
610 Args:
611 f: File-like object to parse
612 encoding: Encoding to use when creating Git objects
613 Returns: Tuple with commit object, diff contents and git version
614 """
615 encoding = encoding or getattr(f, "encoding", "ascii")
616 encoding = encoding or "ascii"
617 contents = f.read()
618 if isinstance(contents, bytes):
619 bparser = email.parser.BytesParser()
620 msg = bparser.parsebytes(contents)
621 else:
622 uparser = email.parser.Parser()
623 msg = uparser.parsestr(contents)
624 return parse_patch_message(msg, encoding)
627def parse_patch_message(
628 msg: email.message.Message, encoding: str | None = None
629) -> tuple["Commit", bytes, bytes | None]:
630 """Extract a Commit object and patch from an e-mail message.
632 Args:
633 msg: An email message (email.message.Message)
634 encoding: Encoding to use to encode Git commits
635 Returns: Tuple with commit object, diff contents and git version
636 """
637 c = Commit()
638 if encoding is None:
639 encoding = "ascii"
640 c.author = msg["from"].encode(encoding)
641 c.committer = msg["from"].encode(encoding)
642 try:
643 patch_tag_start = msg["subject"].index("[PATCH")
644 except ValueError:
645 subject = msg["subject"]
646 else:
647 close = msg["subject"].index("] ", patch_tag_start)
648 subject = msg["subject"][close + 2 :]
649 c.message = (subject.replace("\n", "") + "\n").encode(encoding)
650 first = True
652 body = msg.get_payload(decode=True)
653 if isinstance(body, str):
654 body = body.encode(encoding)
655 if isinstance(body, bytes):
656 lines = body.splitlines(True)
657 else:
658 # Handle other types by converting to string first
659 lines = str(body).encode(encoding).splitlines(True)
660 line_iter = iter(lines)
662 for line in line_iter:
663 if line == b"---\n":
664 break
665 if first:
666 if line.startswith(b"From: "):
667 c.author = line[len(b"From: ") :].rstrip()
668 else:
669 c.message += b"\n" + line
670 first = False
671 else:
672 c.message += line
673 diff = b""
674 for line in line_iter:
675 if line == b"-- \n":
676 break
677 diff += line
678 try:
679 version = next(line_iter).rstrip(b"\n")
680 except StopIteration:
681 version = None
682 return c, diff, version
685def patch_id(diff_data: bytes) -> bytes:
686 """Compute patch ID for a diff.
688 The patch ID is computed by normalizing the diff and computing a SHA1 hash.
689 This follows git's patch-id algorithm which:
690 1. Removes whitespace from lines starting with + or -
691 2. Replaces line numbers in @@ headers with a canonical form
692 3. Computes SHA1 of the result
694 Args:
695 diff_data: Raw diff data as bytes
697 Returns:
698 SHA1 hash of normalized diff (40-byte hex string)
700 TODO: This implementation uses a simple line-by-line approach. For better
701 compatibility with git's patch-id, consider using proper patch parsing that:
702 - Handles edge cases in diff format (binary diffs, mode changes, etc.)
703 - Properly parses unified diff format according to the spec
704 - Matches git's exact normalization algorithm byte-for-byte
705 See git's patch-id.c for reference implementation.
706 """
707 import hashlib
708 import re
710 # Normalize the diff for patch-id computation
711 normalized_lines = []
713 for line in diff_data.split(b"\n"):
714 # Skip diff headers (diff --git, index, ---, +++)
715 if line.startswith(
716 (
717 b"diff --git ",
718 b"index ",
719 b"--- ",
720 b"+++ ",
721 b"new file mode ",
722 b"old file mode ",
723 b"deleted file mode ",
724 b"new mode ",
725 b"old mode ",
726 b"similarity index ",
727 b"dissimilarity index ",
728 b"rename from ",
729 b"rename to ",
730 b"copy from ",
731 b"copy to ",
732 )
733 ):
734 continue
736 # Normalize @@ headers to a canonical form
737 if line.startswith(b"@@"):
738 # Replace line numbers with canonical form
739 match = re.match(rb"^@@\s+-\d+(?:,\d+)?\s+\+\d+(?:,\d+)?\s+@@", line)
740 if match:
741 # Use canonical hunk header without line numbers
742 normalized_lines.append(b"@@")
743 continue
745 # For +/- lines, strip all whitespace
746 if line.startswith((b"+", b"-")):
747 # Keep the +/- prefix but remove all whitespace from the rest
748 if len(line) > 1:
749 # Remove all whitespace from the content
750 content = line[1:].replace(b" ", b"").replace(b"\t", b"")
751 normalized_lines.append(line[:1] + content)
752 else:
753 # Just +/- alone
754 normalized_lines.append(line[:1])
755 continue
757 # Keep context lines and other content as-is
758 if line.startswith(b" ") or line == b"":
759 normalized_lines.append(line)
761 # Join normalized lines and compute SHA1
762 normalized = b"\n".join(normalized_lines)
763 return hashlib.sha1(normalized).hexdigest().encode("ascii")
766def commit_patch_id(
767 store: "BaseObjectStore", commit_id: ObjectID | RawObjectID
768) -> bytes:
769 """Compute patch ID for a commit.
771 Args:
772 store: Object store to read objects from
773 commit_id: Commit ID (40-byte hex string)
775 Returns:
776 Patch ID (40-byte hex string)
777 """
778 from io import BytesIO
780 commit = store[commit_id]
781 assert isinstance(commit, Commit)
783 # Get the parent tree (or empty tree for root commit)
784 if commit.parents:
785 parent = store[commit.parents[0]]
786 assert isinstance(parent, Commit)
787 parent_tree = parent.tree
788 else:
789 # Root commit - compare against empty tree
790 parent_tree = None
792 # Generate diff
793 diff_output = BytesIO()
794 write_tree_diff(diff_output, store, parent_tree, commit.tree)
796 return patch_id(diff_output.getvalue())
799@dataclass
800class MailinfoResult:
801 """Result of mailinfo parsing.
803 Attributes:
804 author_name: Author's name
805 author_email: Author's email address
806 author_date: Author's date (if present in the email)
807 subject: Processed subject line
808 message: Commit message body
809 patch: Patch content
810 message_id: Message-ID header (if -m/--message-id was used)
811 """
813 author_name: str
814 author_email: str
815 author_date: str | None
816 subject: str
817 message: str
818 patch: str
819 message_id: str | None = None
822def _munge_subject(subject: str, keep_subject: bool, keep_non_patch: bool) -> str:
823 """Munge email subject line for commit message.
825 Args:
826 subject: Original subject line
827 keep_subject: If True, keep subject intact (-k option)
828 keep_non_patch: If True, only strip [PATCH] (-b option)
830 Returns:
831 Processed subject line
832 """
833 if keep_subject:
834 return subject
836 result = subject
838 # First remove Re: prefixes (they can appear before brackets)
839 while True:
840 new_result = re.sub(r"^\s*(?:re|RE|Re):\s*", "", result, flags=re.IGNORECASE)
841 if new_result == result:
842 break
843 result = new_result
845 # Remove bracketed strings
846 if keep_non_patch:
847 # Only remove brackets containing "PATCH"
848 # Match each bracket individually anywhere in the string
849 while True:
850 # Remove PATCH bracket, but be careful with whitespace
851 new_result = re.sub(
852 r"\[[^\]]*?PATCH[^\]]*?\](\s+)?", r"\1", result, flags=re.IGNORECASE
853 )
854 if new_result == result:
855 break
856 result = new_result
857 else:
858 # Remove all bracketed strings
859 while True:
860 new_result = re.sub(r"^\s*\[.*?\]\s*", "", result)
861 if new_result == result:
862 break
863 result = new_result
865 # Remove leading/trailing whitespace
866 result = result.strip()
868 # Normalize multiple whitespace to single space
869 result = re.sub(r"\s+", " ", result)
871 return result
874def _find_scissors_line(lines: list[bytes]) -> int | None:
875 """Find the scissors line in message body.
877 Args:
878 lines: List of lines in the message body
880 Returns:
881 Index of scissors line, or None if not found
882 """
883 scissors_pattern = re.compile(
884 rb"^(?:>?\s*-+\s*)?(?:8<|>8)?\s*-+\s*$|^(?:>?\s*-+\s*)(?:cut here|scissors)(?:\s*-+)?$",
885 re.IGNORECASE,
886 )
888 for i, line in enumerate(lines):
889 if scissors_pattern.match(line.strip()):
890 return i
892 return None
895def git_base85_decode(data: bytes) -> bytes:
896 """Decode Git's base85-encoded binary data.
898 Git uses a custom base85 encoding with its own alphabet and line format.
899 Each line starts with a length byte followed by base85-encoded data.
901 Args:
902 data: Base85-encoded data as bytes (may contain multiple lines)
904 Returns:
905 Decoded binary data
907 Raises:
908 ValueError: If the data is invalid
909 """
910 # Git's base85 alphabet (different from RFC 1924)
911 alphabet = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!#$%&()*+-;<=>?@^_`{|}~"
913 # Create decode table
914 decode_table = {}
915 for i, c in enumerate(alphabet):
916 decode_table[c] = i
918 result = bytearray()
919 lines = data.strip().split(b"\n")
921 for line in lines:
922 if not line:
923 continue
925 # First character encodes the length of decoded data for this line
926 if line[0] not in decode_table:
927 continue
929 encoded_len = decode_table[line[0]]
930 if encoded_len == 0:
931 continue
933 # Decode the rest of the line
934 encoded_data = line[1:]
936 # Process in groups of 5 characters (which encode 4 bytes)
937 i = 0
938 decoded_this_line = 0
939 while i < len(encoded_data) and decoded_this_line < encoded_len:
940 # Get up to 5 characters
941 group = encoded_data[i : i + 5]
942 if len(group) == 0:
943 break
945 # Decode 5 base85 digits to a 32-bit value
946 value = 0
947 for c in group:
948 if c not in decode_table:
949 raise ValueError(f"Invalid base85 character: {chr(c)}")
950 value = value * 85 + decode_table[c]
952 # Convert to 4 bytes (big-endian)
953 bytes_to_add = min(4, encoded_len - decoded_this_line)
954 decoded_bytes = value.to_bytes(4, byteorder="big")
955 result.extend(decoded_bytes[:bytes_to_add])
956 decoded_this_line += bytes_to_add
957 i += 5
959 return bytes(result)
962@dataclass
963class PatchHunk:
964 """Represents a single hunk in a unified diff.
966 Attributes:
967 old_start: Starting line number in old file
968 old_count: Number of lines in old file
969 new_start: Starting line number in new file
970 new_count: Number of lines in new file
971 lines: List of diff lines (prefixed with ' ', '+', or '-')
972 """
974 old_start: int
975 old_count: int
976 new_start: int
977 new_count: int
978 lines: list[bytes]
981@dataclass
982class FilePatch:
983 """Represents a patch for a single file.
985 Attributes:
986 old_path: Path to old file (None for new files)
987 new_path: Path to new file (None for deleted files)
988 old_mode: Mode of old file (None for new files)
989 new_mode: Mode of new file (None for deleted files)
990 hunks: List of PatchHunk objects
991 binary: True if this is a binary patch
992 rename_from: Original path for renames (None if not a rename)
993 rename_to: New path for renames (None if not a rename)
994 copy_from: Source path for copies (None if not a copy)
995 copy_to: Destination path for copies (None if not a copy)
996 binary_old: Old binary content for binary patches (base85 encoded)
997 binary_new: New binary content for binary patches (base85 encoded)
998 """
1000 old_path: bytes | None
1001 new_path: bytes | None
1002 old_mode: int | None
1003 new_mode: int | None
1004 hunks: list[PatchHunk]
1005 binary: bool = False
1006 rename_from: bytes | None = None
1007 rename_to: bytes | None = None
1008 copy_from: bytes | None = None
1009 copy_to: bytes | None = None
1010 binary_old: bytes | None = None
1011 binary_new: bytes | None = None
1014def parse_unified_diff(diff_text: bytes) -> list[FilePatch]:
1015 """Parse a unified diff into FilePatch objects.
1017 Args:
1018 diff_text: Unified diff content as bytes
1020 Returns:
1021 List of FilePatch objects
1022 """
1023 patches: list[FilePatch] = []
1024 lines = diff_text.split(b"\n")
1025 i = 0
1027 while i < len(lines):
1028 line = lines[i]
1030 # Look for diff header
1031 if line.startswith(b"diff --git "):
1032 # Parse file patch
1033 old_path = None
1034 new_path = None
1035 old_mode = None
1036 new_mode = None
1037 hunks: list[PatchHunk] = []
1038 binary = False
1039 rename_from = None
1040 rename_to = None
1041 copy_from = None
1042 copy_to = None
1043 binary_old = None
1044 binary_new = None
1046 # Parse extended headers
1047 i += 1
1048 while i < len(lines):
1049 line = lines[i]
1051 if line.startswith(b"old file mode "):
1052 old_mode = int(line.split()[-1], 8)
1053 i += 1
1054 elif line.startswith(b"new file mode "):
1055 new_mode = int(line.split()[-1], 8)
1056 i += 1
1057 elif line.startswith(b"deleted file mode "):
1058 old_mode = int(line.split()[-1], 8)
1059 i += 1
1060 elif line.startswith(b"new mode "):
1061 new_mode = int(line.split()[-1], 8)
1062 i += 1
1063 elif line.startswith(b"old mode "):
1064 old_mode = int(line.split()[-1], 8)
1065 i += 1
1066 elif line.startswith(b"rename from "):
1067 rename_from = line[12:].strip()
1068 i += 1
1069 elif line.startswith(b"rename to "):
1070 rename_to = line[10:].strip()
1071 i += 1
1072 elif line.startswith(b"copy from "):
1073 copy_from = line[10:].strip()
1074 i += 1
1075 elif line.startswith(b"copy to "):
1076 copy_to = line[8:].strip()
1077 i += 1
1078 elif line.startswith(b"similarity index "):
1079 # Just skip similarity index for now
1080 i += 1
1081 elif line.startswith(b"dissimilarity index "):
1082 # Just skip dissimilarity index for now
1083 i += 1
1084 elif line.startswith(b"index "):
1085 i += 1
1086 elif line.startswith(b"--- "):
1087 # Parse old file path
1088 path = line[4:].split(b"\t")[0]
1089 if path != b"/dev/null":
1090 old_path = path
1091 i += 1
1092 elif line.startswith(b"+++ "):
1093 # Parse new file path
1094 path = line[4:].split(b"\t")[0]
1095 if path != b"/dev/null":
1096 new_path = path
1097 i += 1
1098 break
1099 elif line.startswith(b"Binary files"):
1100 binary = True
1101 i += 1
1102 break
1103 elif line.startswith(b"GIT binary patch"):
1104 binary = True
1105 i += 1
1106 # Parse binary patch data
1107 while i < len(lines):
1108 line = lines[i]
1109 if line.startswith(b"literal "):
1110 # New binary data
1111 # size = int(line[8:].strip()) # Size information, not currently used
1112 i += 1
1113 binary_data = b""
1114 while i < len(lines):
1115 line = lines[i]
1116 if (
1117 line.startswith(
1118 (b"literal ", b"delta ", b"diff --git ")
1119 )
1120 or not line.strip()
1121 ):
1122 break
1123 binary_data += line + b"\n"
1124 i += 1
1125 binary_new = binary_data
1126 elif line.startswith(b"delta "):
1127 # Delta patch (not supported yet)
1128 i += 1
1129 while i < len(lines):
1130 line = lines[i]
1131 if (
1132 line.startswith(
1133 (b"literal ", b"delta ", b"diff --git ")
1134 )
1135 or not line.strip()
1136 ):
1137 break
1138 i += 1
1139 else:
1140 break
1141 break
1142 else:
1143 i += 1
1144 break
1146 # Parse hunks
1147 if not binary:
1148 while i < len(lines):
1149 line = lines[i]
1151 if line.startswith(b"@@ "):
1152 # Parse hunk header
1153 match = re.match(
1154 rb"@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@", line
1155 )
1156 if match:
1157 old_start = int(match.group(1))
1158 old_count = int(match.group(2)) if match.group(2) else 1
1159 new_start = int(match.group(3))
1160 new_count = int(match.group(4)) if match.group(4) else 1
1162 # Parse hunk lines
1163 hunk_lines: list[bytes] = []
1164 i += 1
1165 while i < len(lines):
1166 line = lines[i]
1167 if line.startswith((b" ", b"+", b"-", b"\\")):
1168 hunk_lines.append(line)
1169 i += 1
1170 else:
1171 break
1173 hunks.append(
1174 PatchHunk(
1175 old_start=old_start,
1176 old_count=old_count,
1177 new_start=new_start,
1178 new_count=new_count,
1179 lines=hunk_lines,
1180 )
1181 )
1182 else:
1183 i += 1
1184 elif line.startswith(b"diff --git "):
1185 # Next file patch
1186 break
1187 else:
1188 i += 1
1189 if not line.strip():
1190 # Empty line, might be end of patch or separator
1191 break
1193 patches.append(
1194 FilePatch(
1195 old_path=old_path,
1196 new_path=new_path,
1197 old_mode=old_mode,
1198 new_mode=new_mode,
1199 hunks=hunks,
1200 binary=binary,
1201 rename_from=rename_from,
1202 rename_to=rename_to,
1203 copy_from=copy_from,
1204 copy_to=copy_to,
1205 binary_old=binary_old,
1206 binary_new=binary_new,
1207 )
1208 )
1209 else:
1210 i += 1
1212 return patches
1215def apply_patch_hunks(
1216 patch: FilePatch,
1217 original_lines: list[bytes],
1218) -> list[bytes] | None:
1219 """Apply patch hunks to file content.
1221 Args:
1222 patch: FilePatch object to apply
1223 original_lines: Original file content as list of lines
1225 Returns:
1226 Patched file content as list of lines, or None if patch cannot be applied
1227 """
1228 result = original_lines[:]
1229 offset = 0 # Track line offset as we apply hunks
1231 for hunk in patch.hunks:
1232 # Adjust hunk position by offset
1233 # old_start is 1-indexed; 0 means the hunk inserts at the beginning
1234 target_line = max(hunk.old_start - 1, 0) + offset
1236 # Extract old and new content from hunk
1237 old_content: list[bytes] = []
1238 new_content: list[bytes] = []
1240 for line in hunk.lines:
1241 if line.startswith(b"\\"):
1242 # Skip "\ No newline at end of file" markers
1243 continue
1244 elif line.startswith(b" "):
1245 # Context line - add newline if not present
1246 content = line[1:]
1247 if not content.endswith(b"\n"):
1248 content += b"\n"
1249 old_content.append(content)
1250 new_content.append(content)
1251 elif line.startswith(b"-"):
1252 # Deletion - add newline if not present
1253 content = line[1:]
1254 if not content.endswith(b"\n"):
1255 content += b"\n"
1256 old_content.append(content)
1257 elif line.startswith(b"+"):
1258 # Addition - add newline if not present
1259 content = line[1:]
1260 if not content.endswith(b"\n"):
1261 content += b"\n"
1262 new_content.append(content)
1264 # Verify context matches
1265 if target_line < 0 or target_line + len(old_content) > len(result):
1266 # TODO: Implement fuzzy matching
1267 return None
1269 for i, old_line in enumerate(old_content):
1270 if result[target_line + i] != old_line:
1271 # Context doesn't match
1272 # TODO: Implement fuzzy matching
1273 return None
1275 # Apply the patch
1276 result[target_line : target_line + len(old_content)] = new_content
1278 # Update offset for next hunk
1279 offset += len(new_content) - len(old_content)
1281 return result
1284def _apply_rename_or_copy(
1285 r: "Repo",
1286 src_path: bytes,
1287 dst_path: bytes,
1288 strip: int,
1289 patch: FilePatch,
1290 is_rename: bool,
1291 cached: bool,
1292 check: bool,
1293) -> tuple[list[bytes] | None, bool]:
1294 """Apply a rename or copy operation.
1296 Args:
1297 r: Repository object
1298 src_path: Source path
1299 dst_path: Destination path
1300 strip: Number of path components to strip
1301 patch: FilePatch object
1302 is_rename: True for rename, False for copy
1303 cached: Apply to index only, not working tree
1304 check: Check only, don't apply
1306 Returns:
1307 A tuple of (``original_lines``, ``should_continue``) where:
1308 - ``original_lines``: Content lines if hunks need to be applied, None otherwise
1309 - ``should_continue``: True to skip to next patch, False to continue processing
1310 """
1311 from .index import ConflictedIndexEntry, IndexEntry, index_entry_from_stat
1313 # Strip path components
1314 src_stripped = src_path
1315 dst_stripped = dst_path
1316 if strip > 0:
1317 src_parts = src_path.split(b"/")
1318 if len(src_parts) > strip:
1319 src_stripped = b"/".join(src_parts[strip:])
1320 dst_parts = dst_path.split(b"/")
1321 if len(dst_parts) > strip:
1322 dst_stripped = b"/".join(dst_parts[strip:])
1324 repo_path_bytes = r.path.encode("utf-8") if isinstance(r.path, str) else r.path
1325 src_fs_path = os.path.join(repo_path_bytes, src_stripped)
1326 dst_fs_path = os.path.join(repo_path_bytes, dst_stripped)
1328 # Read content from source file
1329 op_name = "rename" if is_rename else "copy"
1330 if os.path.exists(src_fs_path):
1331 with open(src_fs_path, "rb") as f:
1332 content = f.read()
1333 else:
1334 # Try to read from index
1335 index = r.open_index()
1336 if src_stripped in index:
1337 entry = index[src_stripped]
1338 if not isinstance(entry, ConflictedIndexEntry):
1339 obj = r.object_store[entry.sha]
1340 if isinstance(obj, Blob):
1341 content = obj.data
1342 else:
1343 raise ValueError(
1344 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} not found"
1345 )
1346 else:
1347 raise ValueError(
1348 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} is conflicted"
1349 )
1350 else:
1351 raise ValueError(
1352 f"Cannot {op_name}: source {src_stripped.decode('utf-8', errors='replace')} not found"
1353 )
1355 # If there are hunks, return content as lines for further processing
1356 if patch.hunks:
1357 return content.splitlines(keepends=True), False
1359 # No hunks - pure rename/copy
1360 if check:
1361 return None, True
1363 # Write to destination
1364 if not cached:
1365 os.makedirs(os.path.dirname(dst_fs_path), exist_ok=True)
1366 with open(dst_fs_path, "wb") as f:
1367 f.write(content)
1368 if patch.new_mode is not None:
1369 os.chmod(dst_fs_path, patch.new_mode)
1371 # Update index
1372 index = r.open_index()
1373 blob = Blob.from_string(content)
1374 r.object_store.add_object(blob)
1376 if not cached and os.path.exists(dst_fs_path):
1377 st = os.stat(dst_fs_path)
1378 entry = index_entry_from_stat(st, blob.id, 0)
1379 else:
1380 entry = IndexEntry(
1381 ctime=(0, 0),
1382 mtime=(0, 0),
1383 dev=0,
1384 ino=0,
1385 mode=patch.new_mode or 0o100644,
1386 uid=0,
1387 gid=0,
1388 size=len(content),
1389 sha=blob.id,
1390 flags=0,
1391 )
1393 index[dst_stripped] = entry
1395 # For renames, remove the old file
1396 if is_rename:
1397 if not cached and os.path.exists(src_fs_path):
1398 os.remove(src_fs_path)
1399 if src_stripped in index:
1400 del index[src_stripped]
1402 index.write()
1403 return None, True
1406def apply_patches(
1407 r: "Repo",
1408 patches: list[FilePatch],
1409 cached: bool = False,
1410 reverse: bool = False,
1411 check: bool = False,
1412 strip: int = 1,
1413 three_way: bool = False,
1414) -> None:
1415 """Apply a list of file patches to a repository.
1417 Args:
1418 r: Repository object
1419 patches: List of FilePatch objects to apply
1420 cached: Apply patch to index only, not working tree
1421 reverse: Apply patch in reverse
1422 check: Only check if patch can be applied, don't apply
1423 strip: Number of leading path components to strip (default: 1)
1424 three_way: Fall back to 3-way merge if patch does not apply cleanly
1426 Raises:
1427 ValueError: If patch cannot be applied
1428 """
1429 from .index import ConflictedIndexEntry, IndexEntry, index_entry_from_stat
1431 for patch in patches:
1432 # Determine the file path
1433 # For renames/copies without hunks, old_path/new_path may be None
1434 # Use local variables to avoid mutating the patch object
1435 old_path = patch.old_path
1436 new_path = patch.new_path
1438 if new_path is None and old_path is None:
1439 if patch.rename_to is not None:
1440 # Use rename_to for the target path
1441 new_path = patch.rename_to
1442 old_path = patch.rename_from
1443 elif patch.copy_to is not None:
1444 # Use copy_to for the target path
1445 new_path = patch.copy_to
1446 old_path = patch.copy_from
1447 else:
1448 raise ValueError("Patch has no file path")
1450 # Choose path based on operation
1451 file_path: bytes
1452 if new_path is None:
1453 # Deletion
1454 if old_path is None:
1455 raise ValueError("Patch has no file path")
1456 file_path = old_path
1457 elif old_path is None:
1458 # Addition
1459 file_path = new_path
1460 else:
1461 # Modification (use new path)
1462 file_path = new_path
1464 # Strip path components
1465 if strip > 0:
1466 parts = file_path.split(b"/")
1467 if len(parts) > strip:
1468 file_path = b"/".join(parts[strip:])
1470 # Convert to filesystem path
1471 tree_path = file_path
1472 fs_path = os.path.join(
1473 r.path.encode("utf-8") if isinstance(r.path, str) else r.path, file_path
1474 )
1476 # Handle renames and copies
1477 original_lines: list[bytes] | None = None
1478 if patch.rename_from is not None and patch.rename_to is not None:
1479 original_lines, should_continue = _apply_rename_or_copy(
1480 r,
1481 patch.rename_from,
1482 patch.rename_to,
1483 strip,
1484 patch,
1485 is_rename=True,
1486 cached=cached,
1487 check=check,
1488 )
1489 if should_continue:
1490 continue
1491 elif patch.copy_from is not None and patch.copy_to is not None:
1492 original_lines, should_continue = _apply_rename_or_copy(
1493 r,
1494 patch.copy_from,
1495 patch.copy_to,
1496 strip,
1497 patch,
1498 is_rename=False,
1499 cached=cached,
1500 check=check,
1501 )
1502 if should_continue:
1503 continue
1505 # Handle binary patches
1506 if patch.binary:
1507 if patch.binary_new is not None:
1508 # Decode binary patch
1509 try:
1510 binary_content = git_base85_decode(patch.binary_new)
1511 except (ValueError, KeyError) as e:
1512 raise ValueError(f"Failed to decode binary patch: {e}")
1514 if check:
1515 # Just checking, don't actually apply
1516 continue
1518 # Write binary file
1519 if not cached:
1520 os.makedirs(os.path.dirname(fs_path), exist_ok=True)
1521 with open(fs_path, "wb") as f:
1522 f.write(binary_content)
1523 if patch.new_mode is not None:
1524 os.chmod(fs_path, patch.new_mode)
1526 # Update index
1527 index = r.open_index()
1528 blob = Blob.from_string(binary_content)
1529 r.object_store.add_object(blob)
1531 if not cached and os.path.exists(fs_path):
1532 st = os.stat(fs_path)
1533 entry = index_entry_from_stat(st, blob.id, 0)
1534 else:
1535 entry = IndexEntry(
1536 ctime=(0, 0),
1537 mtime=(0, 0),
1538 dev=0,
1539 ino=0,
1540 mode=patch.new_mode or 0o100644,
1541 uid=0,
1542 gid=0,
1543 size=len(binary_content),
1544 sha=blob.id,
1545 flags=0,
1546 )
1548 index[tree_path] = entry
1549 index.write()
1550 continue
1551 else:
1552 # Old-style "Binary files differ" message without actual patch data
1553 raise NotImplementedError(
1554 "Binary patch detected but no patch data provided (use git diff --binary)"
1555 )
1557 # Read original file content (unless already loaded from rename/copy)
1558 if original_lines is None:
1559 if patch.old_path is None:
1560 # New file
1561 original_lines = []
1562 else:
1563 if os.path.exists(fs_path):
1564 with open(fs_path, "rb") as f:
1565 content = f.read()
1566 original_lines = content.splitlines(keepends=True)
1567 else:
1568 # File doesn't exist - check if it's in the index
1569 try:
1570 index = r.open_index()
1571 if tree_path in index:
1572 index_entry: IndexEntry | ConflictedIndexEntry = index[
1573 tree_path
1574 ]
1575 if not isinstance(index_entry, ConflictedIndexEntry):
1576 obj = r.object_store[index_entry.sha]
1577 if isinstance(obj, Blob):
1578 original_lines = obj.data.splitlines(keepends=True)
1579 else:
1580 original_lines = []
1581 else:
1582 original_lines = []
1583 else:
1584 original_lines = []
1585 except (KeyError, FileNotFoundError):
1586 original_lines = []
1588 # Reverse patch if requested
1589 if reverse:
1590 # Swap old and new in hunks
1591 for hunk in patch.hunks:
1592 hunk.old_start, hunk.new_start = hunk.new_start, hunk.old_start
1593 hunk.old_count, hunk.new_count = hunk.new_count, hunk.old_count
1594 # Swap +/- prefixes
1595 reversed_lines = []
1596 for line in hunk.lines:
1597 if line.startswith(b"+"):
1598 reversed_lines.append(b"-" + line[1:])
1599 elif line.startswith(b"-"):
1600 reversed_lines.append(b"+" + line[1:])
1601 else:
1602 reversed_lines.append(line)
1603 hunk.lines = reversed_lines
1605 # Apply the patch
1606 assert original_lines is not None
1607 result = apply_patch_hunks(patch, original_lines)
1609 if result is None and three_way:
1610 # Try 3-way merge fallback
1611 from .merge import merge_blobs
1613 # Reconstruct base version from the patch
1614 # Base is what you get by taking only the old lines from hunks
1615 base_lines = []
1616 theirs_lines = []
1618 for hunk in patch.hunks:
1619 for line in hunk.lines:
1620 if line.startswith(b"\\"):
1621 # Skip "\ No newline at end of file" markers
1622 continue
1623 elif line.startswith(b" "):
1624 # Context line - in both base and theirs
1625 content = line[1:]
1626 if not content.endswith(b"\n"):
1627 content += b"\n"
1628 base_lines.append(content)
1629 theirs_lines.append(content)
1630 elif line.startswith(b"-"):
1631 # Deletion - only in base
1632 content = line[1:]
1633 if not content.endswith(b"\n"):
1634 content += b"\n"
1635 base_lines.append(content)
1636 elif line.startswith(b"+"):
1637 # Addition - only in theirs
1638 content = line[1:]
1639 if not content.endswith(b"\n"):
1640 content += b"\n"
1641 theirs_lines.append(content)
1643 # Create blobs for merging
1644 base_content = b"".join(base_lines)
1645 ours_content = b"".join(original_lines)
1646 theirs_content = b"".join(theirs_lines)
1648 base_blob = Blob.from_string(base_content) if base_content else None
1649 ours_blob = Blob.from_string(ours_content) if ours_content else None
1650 theirs_blob = Blob.from_string(theirs_content)
1652 # Perform 3-way merge
1653 merged_content, _had_conflicts = merge_blobs(
1654 base_blob, ours_blob, theirs_blob, path=tree_path
1655 )
1657 result = merged_content.splitlines(keepends=True)
1659 # Note: if _had_conflicts is True, the result contains conflict markers
1660 # Git would exit with error code, but we continue processing
1661 elif result is None:
1662 raise PatchApplicationFailure(
1663 f"Patch does not apply to {file_path.decode('utf-8', errors='replace')}"
1664 )
1666 if check:
1667 # Just checking, don't actually apply
1668 continue
1670 # Write result
1671 result_content = b"".join(result)
1673 if patch.new_path is None:
1674 # File deletion
1675 if not cached and os.path.exists(fs_path):
1676 os.remove(fs_path)
1677 # Remove from index
1678 index = r.open_index()
1679 if tree_path in index:
1680 del index[tree_path]
1681 index.write()
1682 else:
1683 # File addition or modification
1684 if not cached:
1685 # Write to working tree
1686 os.makedirs(os.path.dirname(fs_path), exist_ok=True)
1687 with open(fs_path, "wb") as f:
1688 f.write(result_content)
1690 # Update file mode if specified
1691 if patch.new_mode is not None:
1692 os.chmod(fs_path, patch.new_mode)
1694 # Update index
1695 index = r.open_index()
1696 blob = Blob.from_string(result_content)
1697 r.object_store.add_object(blob)
1699 # Get file stat for index entry
1700 if not cached and os.path.exists(fs_path):
1701 st = os.stat(fs_path)
1702 entry = index_entry_from_stat(st, blob.id, 0)
1703 else:
1704 # Create a minimal index entry for cached-only changes
1705 entry = IndexEntry(
1706 ctime=(0, 0),
1707 mtime=(0, 0),
1708 dev=0,
1709 ino=0,
1710 mode=patch.new_mode or 0o100644,
1711 uid=0,
1712 gid=0,
1713 size=len(result_content),
1714 sha=blob.id,
1715 flags=0,
1716 )
1718 index[tree_path] = entry
1720 # Handle cleanup for renames with hunks
1721 if patch.rename_from is not None and patch.rename_to is not None:
1722 # Remove old file after successful rename
1723 old_rename_path = patch.rename_from
1724 if strip > 0:
1725 old_parts = old_rename_path.split(b"/")
1726 if len(old_parts) > strip:
1727 old_rename_path = b"/".join(old_parts[strip:])
1729 old_fs_path = os.path.join(
1730 r.path.encode("utf-8") if isinstance(r.path, str) else r.path,
1731 old_rename_path,
1732 )
1734 if not cached and os.path.exists(old_fs_path):
1735 os.remove(old_fs_path)
1736 if old_rename_path in index:
1737 del index[old_rename_path]
1739 index.write()
1742def mailinfo(
1743 msg: email.message.Message | BinaryIO | TextIO,
1744 keep_subject: bool = False,
1745 keep_non_patch: bool = False,
1746 encoding: str | None = None,
1747 scissors: bool = False,
1748 message_id: bool = False,
1749) -> MailinfoResult:
1750 """Extract patch information from an email message.
1752 This function parses an email message and extracts commit metadata
1753 (author, email, subject) and separates the commit message from the
1754 patch content, similar to git mailinfo.
1756 Args:
1757 msg: Email message (email.message.Message object) or file handle to read from
1758 keep_subject: If True, keep subject intact without munging (-k)
1759 keep_non_patch: If True, only strip [PATCH] from brackets (-b)
1760 encoding: Character encoding to use (default: detect from message)
1761 scissors: If True, remove everything before scissors line
1762 message_id: If True, include Message-ID in commit message (-m)
1764 Returns:
1765 MailinfoResult with parsed information
1767 Raises:
1768 ValueError: If message is malformed or missing required fields
1769 """
1770 # Parse message if given a file handle
1771 parsed_msg: email.message.Message
1772 if not isinstance(msg, email.message.Message):
1773 if hasattr(msg, "read"):
1774 content = msg.read()
1775 if isinstance(content, bytes):
1776 bparser = email.parser.BytesParser()
1777 parsed_msg = bparser.parsebytes(content)
1778 else:
1779 sparser = email.parser.Parser()
1780 parsed_msg = sparser.parsestr(content)
1781 else:
1782 raise ValueError("msg must be an email.message.Message or file-like object")
1783 else:
1784 parsed_msg = msg
1786 # Detect encoding from message if not specified
1787 if encoding is None:
1788 encoding = parsed_msg.get_content_charset() or "utf-8"
1790 # Extract author information
1791 from_header = parsed_msg.get("From", "")
1792 if not from_header:
1793 raise ValueError("Email message missing 'From' header")
1795 # Parse "Name <email>" format
1796 author_name, author_email = email.utils.parseaddr(from_header)
1797 if not author_email:
1798 raise ValueError(
1799 f"Could not parse email address from 'From' header: {from_header}"
1800 )
1802 # Extract date
1803 date_header = parsed_msg.get("Date")
1804 author_date = date_header if date_header else None
1806 # Extract and process subject
1807 subject = parsed_msg.get("Subject", "")
1808 if not subject:
1809 subject = "(no subject)"
1811 # Convert Header object to string if needed
1812 subject = str(subject)
1814 # Remove newlines from subject
1815 subject = subject.replace("\n", " ").replace("\r", " ")
1816 subject = _munge_subject(subject, keep_subject, keep_non_patch)
1818 # Extract Message-ID if requested
1819 msg_id = None
1820 if message_id:
1821 msg_id = parsed_msg.get("Message-ID")
1823 # Get message body
1824 body = parsed_msg.get_payload(decode=True)
1825 if body is None:
1826 body = b""
1827 elif isinstance(body, str):
1828 body = body.encode(encoding)
1829 elif not isinstance(body, bytes):
1830 # Handle multipart or other types
1831 body = str(body).encode(encoding)
1833 # Split into lines
1834 lines = body.splitlines(keepends=True)
1836 # Handle scissors
1837 scissors_idx = None
1838 if scissors:
1839 scissors_idx = _find_scissors_line(lines)
1840 if scissors_idx is not None:
1841 # Remove everything up to and including scissors line
1842 lines = lines[scissors_idx + 1 :]
1844 # Separate commit message from patch
1845 # Look for the "---" separator that indicates start of diffstat/patch
1846 message_lines: list[bytes] = []
1847 patch_lines: list[bytes] = []
1848 in_patch = False
1850 for line in lines:
1851 if not in_patch and line == b"---\n":
1852 in_patch = True
1853 patch_lines.append(line)
1854 elif in_patch:
1855 # Stop at signature marker "-- "
1856 if line == b"-- \n":
1857 break
1858 patch_lines.append(line)
1859 else:
1860 message_lines.append(line)
1862 # Build commit message
1863 commit_message = b"".join(message_lines).decode(encoding, errors="replace")
1865 # Clean up commit message
1866 commit_message = commit_message.strip()
1868 # Append Message-ID if requested
1869 if message_id and msg_id:
1870 if commit_message:
1871 commit_message += "\n\n"
1872 commit_message += f"Message-ID: {msg_id}"
1874 # Build patch content
1875 patch_content = b"".join(patch_lines).decode(encoding, errors="replace")
1877 return MailinfoResult(
1878 author_name=author_name,
1879 author_email=author_email,
1880 author_date=author_date,
1881 subject=subject,
1882 message=commit_message,
1883 patch=patch_content,
1884 message_id=msg_id,
1885 )