Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# patch.py -- For dealing with packed-style patches.
2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Classes for dealing with git am-style patches.
24These patches are basically unified diffs with some extra metadata tacked
25on.
26"""
28import email.message
29import email.parser
30import email.utils
31import re
32import time
33from collections.abc import Generator, Sequence
34from dataclasses import dataclass
35from difflib import SequenceMatcher
36from typing import (
37 IO,
38 TYPE_CHECKING,
39 BinaryIO,
40 Optional,
41 TextIO,
42)
44if TYPE_CHECKING:
45 from .object_store import BaseObjectStore
47from .objects import S_ISGITLINK, Blob, Commit
49FIRST_FEW_BYTES = 8000
51DEFAULT_DIFF_ALGORITHM = "myers"
54class DiffAlgorithmNotAvailable(Exception):
55 """Raised when a requested diff algorithm is not available."""
57 def __init__(self, algorithm: str, install_hint: str = "") -> None:
58 """Initialize exception.
60 Args:
61 algorithm: Name of the unavailable algorithm
62 install_hint: Optional installation hint
63 """
64 self.algorithm = algorithm
65 self.install_hint = install_hint
66 if install_hint:
67 super().__init__(
68 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}"
69 )
70 else:
71 super().__init__(
72 f"Diff algorithm '{algorithm}' requested but not available."
73 )
76def write_commit_patch(
77 f: IO[bytes],
78 commit: "Commit",
79 contents: str | bytes,
80 progress: tuple[int, int],
81 version: str | None = None,
82 encoding: str | None = None,
83) -> None:
84 """Write a individual file patch.
86 Args:
87 f: File-like object to write to
88 commit: Commit object
89 contents: Contents of the patch
90 progress: tuple with current patch number and total.
91 version: Version string to include in patch header
92 encoding: Encoding to use for the patch
94 Returns:
95 tuple with filename and contents
96 """
97 encoding = encoding or getattr(f, "encoding", "ascii")
98 if encoding is None:
99 encoding = "ascii"
100 if isinstance(contents, str):
101 contents = contents.encode(encoding)
102 (num, total) = progress
103 f.write(
104 b"From "
105 + commit.id
106 + b" "
107 + time.ctime(commit.commit_time).encode(encoding)
108 + b"\n"
109 )
110 f.write(b"From: " + commit.author + b"\n")
111 f.write(
112 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n"
113 )
114 f.write(
115 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n"
116 )
117 f.write(b"\n")
118 f.write(b"---\n")
119 try:
120 import subprocess
122 p = subprocess.Popen(
123 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE
124 )
125 except (ImportError, OSError):
126 pass # diffstat not available?
127 else:
128 (diffstat, _) = p.communicate(contents)
129 f.write(diffstat)
130 f.write(b"\n")
131 f.write(contents)
132 f.write(b"-- \n")
133 if version is None:
134 from dulwich import __version__ as dulwich_version
136 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version)
137 else:
138 if encoding is None:
139 encoding = "ascii"
140 f.write(version.encode(encoding) + b"\n")
143def get_summary(commit: "Commit") -> str:
144 """Determine the summary line for use in a filename.
146 Args:
147 commit: Commit
148 Returns: Summary string
149 """
150 decoded = commit.message.decode(errors="replace")
151 lines = decoded.splitlines()
152 return lines[0].replace(" ", "-") if lines else ""
155# Unified Diff
156def _format_range_unified(start: int, stop: int) -> str:
157 """Convert range to the "ed" format."""
158 # Per the diff spec at http://www.unix.org/single_unix_specification/
159 beginning = start + 1 # lines start numbering with one
160 length = stop - start
161 if length == 1:
162 return f"{beginning}"
163 if not length:
164 beginning -= 1 # empty ranges begin at line just before the range
165 return f"{beginning},{length}"
168def unified_diff(
169 a: Sequence[bytes],
170 b: Sequence[bytes],
171 fromfile: bytes = b"",
172 tofile: bytes = b"",
173 fromfiledate: str = "",
174 tofiledate: str = "",
175 n: int = 3,
176 lineterm: str = "\n",
177 tree_encoding: str = "utf-8",
178 output_encoding: str = "utf-8",
179) -> Generator[bytes, None, None]:
180 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does.
182 Based on the same function in Python2.7 difflib.py
183 """
184 started = False
185 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n):
186 if not started:
187 started = True
188 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
189 todate = f"\t{tofiledate}" if tofiledate else ""
190 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
191 output_encoding
192 )
193 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
194 output_encoding
195 )
197 first, last = group[0], group[-1]
198 file1_range = _format_range_unified(first[1], last[2])
199 file2_range = _format_range_unified(first[3], last[4])
200 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
202 for tag, i1, i2, j1, j2 in group:
203 if tag == "equal":
204 for line in a[i1:i2]:
205 yield b" " + line
206 continue
207 if tag in ("replace", "delete"):
208 for line in a[i1:i2]:
209 if not line[-1:] == b"\n":
210 line += b"\n\\ No newline at end of file\n"
211 yield b"-" + line
212 if tag in ("replace", "insert"):
213 for line in b[j1:j2]:
214 if not line[-1:] == b"\n":
215 line += b"\n\\ No newline at end of file\n"
216 yield b"+" + line
219def _get_sequence_matcher(
220 algorithm: str, a: Sequence[bytes], b: Sequence[bytes]
221) -> SequenceMatcher[bytes]:
222 """Get appropriate sequence matcher for the given algorithm.
224 Args:
225 algorithm: Diff algorithm ("myers" or "patience")
226 a: First sequence
227 b: Second sequence
229 Returns:
230 Configured sequence matcher instance
232 Raises:
233 DiffAlgorithmNotAvailable: If patience requested but not available
234 """
235 if algorithm == "patience":
236 try:
237 from patiencediff import PatienceSequenceMatcher
239 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore]
240 except ImportError:
241 raise DiffAlgorithmNotAvailable(
242 "patience", "Install with: pip install 'dulwich[patiencediff]'"
243 )
244 else:
245 return SequenceMatcher(a=a, b=b)
248def unified_diff_with_algorithm(
249 a: Sequence[bytes],
250 b: Sequence[bytes],
251 fromfile: bytes = b"",
252 tofile: bytes = b"",
253 fromfiledate: str = "",
254 tofiledate: str = "",
255 n: int = 3,
256 lineterm: str = "\n",
257 tree_encoding: str = "utf-8",
258 output_encoding: str = "utf-8",
259 algorithm: str | None = None,
260) -> Generator[bytes, None, None]:
261 """Generate unified diff with specified algorithm.
263 Args:
264 a: First sequence of lines
265 b: Second sequence of lines
266 fromfile: Name of first file
267 tofile: Name of second file
268 fromfiledate: Date of first file
269 tofiledate: Date of second file
270 n: Number of context lines
271 lineterm: Line terminator
272 tree_encoding: Encoding for tree paths
273 output_encoding: Encoding for output
274 algorithm: Diff algorithm to use ("myers" or "patience")
276 Returns:
277 Generator yielding diff lines
279 Raises:
280 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available
281 """
282 if algorithm is None:
283 algorithm = DEFAULT_DIFF_ALGORITHM
285 matcher = _get_sequence_matcher(algorithm, a, b)
287 started = False
288 for group in matcher.get_grouped_opcodes(n):
289 if not started:
290 started = True
291 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
292 todate = f"\t{tofiledate}" if tofiledate else ""
293 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
294 output_encoding
295 )
296 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
297 output_encoding
298 )
300 first, last = group[0], group[-1]
301 file1_range = _format_range_unified(first[1], last[2])
302 file2_range = _format_range_unified(first[3], last[4])
303 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
305 for tag, i1, i2, j1, j2 in group:
306 if tag == "equal":
307 for line in a[i1:i2]:
308 yield b" " + line
309 continue
310 if tag in ("replace", "delete"):
311 for line in a[i1:i2]:
312 if not line[-1:] == b"\n":
313 line += b"\n\\ No newline at end of file\n"
314 yield b"-" + line
315 if tag in ("replace", "insert"):
316 for line in b[j1:j2]:
317 if not line[-1:] == b"\n":
318 line += b"\n\\ No newline at end of file\n"
319 yield b"+" + line
322def is_binary(content: bytes) -> bool:
323 """See if the first few bytes contain any null characters.
325 Args:
326 content: Bytestring to check for binary content
327 """
328 return b"\0" in content[:FIRST_FEW_BYTES]
331def shortid(hexsha: bytes | None) -> bytes:
332 """Get short object ID.
334 Args:
335 hexsha: Full hex SHA or None
337 Returns:
338 7-character short ID
339 """
340 if hexsha is None:
341 return b"0" * 7
342 else:
343 return hexsha[:7]
346def patch_filename(p: bytes | None, root: bytes) -> bytes:
347 """Generate patch filename.
349 Args:
350 p: Path or None
351 root: Root directory
353 Returns:
354 Full patch filename
355 """
356 if p is None:
357 return b"/dev/null"
358 else:
359 return root + b"/" + p
362def write_object_diff(
363 f: IO[bytes],
364 store: "BaseObjectStore",
365 old_file: tuple[bytes | None, int | None, bytes | None],
366 new_file: tuple[bytes | None, int | None, bytes | None],
367 diff_binary: bool = False,
368 diff_algorithm: str | None = None,
369) -> None:
370 """Write the diff for an object.
372 Args:
373 f: File-like object to write to
374 store: Store to retrieve objects from, if necessary
375 old_file: (path, mode, hexsha) tuple
376 new_file: (path, mode, hexsha) tuple
377 diff_binary: Whether to diff files even if they
378 are considered binary files by is_binary().
379 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
381 Note: the tuple elements should be None for nonexistent files
382 """
383 (old_path, old_mode, old_id) = old_file
384 (new_path, new_mode, new_id) = new_file
385 patched_old_path = patch_filename(old_path, b"a")
386 patched_new_path = patch_filename(new_path, b"b")
388 def content(mode: int | None, hexsha: bytes | None) -> Blob:
389 """Get blob content for a file.
391 Args:
392 mode: File mode
393 hexsha: Object SHA
395 Returns:
396 Blob object
397 """
398 if hexsha is None:
399 return Blob.from_string(b"")
400 elif mode is not None and S_ISGITLINK(mode):
401 return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
402 else:
403 obj = store[hexsha]
404 if isinstance(obj, Blob):
405 return obj
406 else:
407 # Fallback for non-blob objects
408 return Blob.from_string(obj.as_raw_string())
410 def lines(content: "Blob") -> list[bytes]:
411 """Split blob content into lines.
413 Args:
414 content: Blob content
416 Returns:
417 List of lines
418 """
419 if not content:
420 return []
421 else:
422 return content.splitlines()
424 f.writelines(
425 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id))
426 )
427 old_content = content(old_mode, old_id)
428 new_content = content(new_mode, new_id)
429 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)):
430 binary_diff = (
431 b"Binary files "
432 + patched_old_path
433 + b" and "
434 + patched_new_path
435 + b" differ\n"
436 )
437 f.write(binary_diff)
438 else:
439 f.writelines(
440 unified_diff_with_algorithm(
441 lines(old_content),
442 lines(new_content),
443 patched_old_path,
444 patched_new_path,
445 algorithm=diff_algorithm,
446 )
447 )
450# TODO(jelmer): Support writing unicode, rather than bytes.
451def gen_diff_header(
452 paths: tuple[bytes | None, bytes | None],
453 modes: tuple[int | None, int | None],
454 shas: tuple[bytes | None, bytes | None],
455) -> Generator[bytes, None, None]:
456 """Write a blob diff header.
458 Args:
459 paths: Tuple with old and new path
460 modes: Tuple with old and new modes
461 shas: Tuple with old and new shas
462 """
463 (old_path, new_path) = paths
464 (old_mode, new_mode) = modes
465 (old_sha, new_sha) = shas
466 if old_path is None and new_path is not None:
467 old_path = new_path
468 if new_path is None and old_path is not None:
469 new_path = old_path
470 old_path = patch_filename(old_path, b"a")
471 new_path = patch_filename(new_path, b"b")
472 yield b"diff --git " + old_path + b" " + new_path + b"\n"
474 if old_mode != new_mode:
475 if new_mode is not None:
476 if old_mode is not None:
477 yield (f"old file mode {old_mode:o}\n").encode("ascii")
478 yield (f"new file mode {new_mode:o}\n").encode("ascii")
479 else:
480 yield (f"deleted file mode {old_mode:o}\n").encode("ascii")
481 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha)
482 if new_mode is not None and old_mode is not None:
483 yield (f" {new_mode:o}").encode("ascii")
484 yield b"\n"
487# TODO(jelmer): Support writing unicode, rather than bytes.
488def write_blob_diff(
489 f: IO[bytes],
490 old_file: tuple[bytes | None, int | None, Optional["Blob"]],
491 new_file: tuple[bytes | None, int | None, Optional["Blob"]],
492 diff_algorithm: str | None = None,
493) -> None:
494 """Write blob diff.
496 Args:
497 f: File-like object to write to
498 old_file: (path, mode, hexsha) tuple (None if nonexisting)
499 new_file: (path, mode, hexsha) tuple (None if nonexisting)
500 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
502 Note: The use of write_object_diff is recommended over this function.
503 """
504 (old_path, old_mode, old_blob) = old_file
505 (new_path, new_mode, new_blob) = new_file
506 patched_old_path = patch_filename(old_path, b"a")
507 patched_new_path = patch_filename(new_path, b"b")
509 def lines(blob: Optional["Blob"]) -> list[bytes]:
510 """Split blob content into lines.
512 Args:
513 blob: Blob object or None
515 Returns:
516 List of lines
517 """
518 if blob is not None:
519 return blob.splitlines()
520 else:
521 return []
523 f.writelines(
524 gen_diff_header(
525 (old_path, new_path),
526 (old_mode, new_mode),
527 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)),
528 )
529 )
530 old_contents = lines(old_blob)
531 new_contents = lines(new_blob)
532 f.writelines(
533 unified_diff_with_algorithm(
534 old_contents,
535 new_contents,
536 patched_old_path,
537 patched_new_path,
538 algorithm=diff_algorithm,
539 )
540 )
543def write_tree_diff(
544 f: IO[bytes],
545 store: "BaseObjectStore",
546 old_tree: bytes | None,
547 new_tree: bytes | None,
548 diff_binary: bool = False,
549 diff_algorithm: str | None = None,
550) -> None:
551 """Write tree diff.
553 Args:
554 f: File-like object to write to.
555 store: Object store to read from
556 old_tree: Old tree id
557 new_tree: New tree id
558 diff_binary: Whether to diff files even if they
559 are considered binary files by is_binary().
560 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
561 """
562 changes = store.tree_changes(old_tree, new_tree)
563 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
564 write_object_diff(
565 f,
566 store,
567 (oldpath, oldmode, oldsha),
568 (newpath, newmode, newsha),
569 diff_binary=diff_binary,
570 diff_algorithm=diff_algorithm,
571 )
574def git_am_patch_split(
575 f: TextIO | BinaryIO, encoding: str | None = None
576) -> tuple["Commit", bytes, bytes | None]:
577 """Parse a git-am-style patch and split it up into bits.
579 Args:
580 f: File-like object to parse
581 encoding: Encoding to use when creating Git objects
582 Returns: Tuple with commit object, diff contents and git version
583 """
584 encoding = encoding or getattr(f, "encoding", "ascii")
585 encoding = encoding or "ascii"
586 contents = f.read()
587 if isinstance(contents, bytes):
588 bparser = email.parser.BytesParser()
589 msg = bparser.parsebytes(contents)
590 else:
591 uparser = email.parser.Parser()
592 msg = uparser.parsestr(contents)
593 return parse_patch_message(msg, encoding)
596def parse_patch_message(
597 msg: email.message.Message, encoding: str | None = None
598) -> tuple["Commit", bytes, bytes | None]:
599 """Extract a Commit object and patch from an e-mail message.
601 Args:
602 msg: An email message (email.message.Message)
603 encoding: Encoding to use to encode Git commits
604 Returns: Tuple with commit object, diff contents and git version
605 """
606 c = Commit()
607 if encoding is None:
608 encoding = "ascii"
609 c.author = msg["from"].encode(encoding)
610 c.committer = msg["from"].encode(encoding)
611 try:
612 patch_tag_start = msg["subject"].index("[PATCH")
613 except ValueError:
614 subject = msg["subject"]
615 else:
616 close = msg["subject"].index("] ", patch_tag_start)
617 subject = msg["subject"][close + 2 :]
618 c.message = (subject.replace("\n", "") + "\n").encode(encoding)
619 first = True
621 body = msg.get_payload(decode=True)
622 if isinstance(body, str):
623 body = body.encode(encoding)
624 if isinstance(body, bytes):
625 lines = body.splitlines(True)
626 else:
627 # Handle other types by converting to string first
628 lines = str(body).encode(encoding).splitlines(True)
629 line_iter = iter(lines)
631 for line in line_iter:
632 if line == b"---\n":
633 break
634 if first:
635 if line.startswith(b"From: "):
636 c.author = line[len(b"From: ") :].rstrip()
637 else:
638 c.message += b"\n" + line
639 first = False
640 else:
641 c.message += line
642 diff = b""
643 for line in line_iter:
644 if line == b"-- \n":
645 break
646 diff += line
647 try:
648 version = next(line_iter).rstrip(b"\n")
649 except StopIteration:
650 version = None
651 return c, diff, version
654def patch_id(diff_data: bytes) -> bytes:
655 """Compute patch ID for a diff.
657 The patch ID is computed by normalizing the diff and computing a SHA1 hash.
658 This follows git's patch-id algorithm which:
659 1. Removes whitespace from lines starting with + or -
660 2. Replaces line numbers in @@ headers with a canonical form
661 3. Computes SHA1 of the result
663 Args:
664 diff_data: Raw diff data as bytes
666 Returns:
667 SHA1 hash of normalized diff (40-byte hex string)
669 TODO: This implementation uses a simple line-by-line approach. For better
670 compatibility with git's patch-id, consider using proper patch parsing that:
671 - Handles edge cases in diff format (binary diffs, mode changes, etc.)
672 - Properly parses unified diff format according to the spec
673 - Matches git's exact normalization algorithm byte-for-byte
674 See git's patch-id.c for reference implementation.
675 """
676 import hashlib
677 import re
679 # Normalize the diff for patch-id computation
680 normalized_lines = []
682 for line in diff_data.split(b"\n"):
683 # Skip diff headers (diff --git, index, ---, +++)
684 if line.startswith(
685 (
686 b"diff --git ",
687 b"index ",
688 b"--- ",
689 b"+++ ",
690 b"new file mode ",
691 b"old file mode ",
692 b"deleted file mode ",
693 b"new mode ",
694 b"old mode ",
695 b"similarity index ",
696 b"dissimilarity index ",
697 b"rename from ",
698 b"rename to ",
699 b"copy from ",
700 b"copy to ",
701 )
702 ):
703 continue
705 # Normalize @@ headers to a canonical form
706 if line.startswith(b"@@"):
707 # Replace line numbers with canonical form
708 match = re.match(rb"^@@\s+-\d+(?:,\d+)?\s+\+\d+(?:,\d+)?\s+@@", line)
709 if match:
710 # Use canonical hunk header without line numbers
711 normalized_lines.append(b"@@")
712 continue
714 # For +/- lines, strip all whitespace
715 if line.startswith((b"+", b"-")):
716 # Keep the +/- prefix but remove all whitespace from the rest
717 if len(line) > 1:
718 # Remove all whitespace from the content
719 content = line[1:].replace(b" ", b"").replace(b"\t", b"")
720 normalized_lines.append(line[:1] + content)
721 else:
722 # Just +/- alone
723 normalized_lines.append(line[:1])
724 continue
726 # Keep context lines and other content as-is
727 if line.startswith(b" ") or line == b"":
728 normalized_lines.append(line)
730 # Join normalized lines and compute SHA1
731 normalized = b"\n".join(normalized_lines)
732 return hashlib.sha1(normalized).hexdigest().encode("ascii")
735def commit_patch_id(store: "BaseObjectStore", commit_id: bytes) -> bytes:
736 """Compute patch ID for a commit.
738 Args:
739 store: Object store to read objects from
740 commit_id: Commit ID (40-byte hex string)
742 Returns:
743 Patch ID (40-byte hex string)
744 """
745 from io import BytesIO
747 commit = store[commit_id]
748 assert isinstance(commit, Commit)
750 # Get the parent tree (or empty tree for root commit)
751 if commit.parents:
752 parent = store[commit.parents[0]]
753 assert isinstance(parent, Commit)
754 parent_tree = parent.tree
755 else:
756 # Root commit - compare against empty tree
757 parent_tree = None
759 # Generate diff
760 diff_output = BytesIO()
761 write_tree_diff(diff_output, store, parent_tree, commit.tree)
763 return patch_id(diff_output.getvalue())
766@dataclass
767class MailinfoResult:
768 """Result of mailinfo parsing.
770 Attributes:
771 author_name: Author's name
772 author_email: Author's email address
773 author_date: Author's date (if present in the email)
774 subject: Processed subject line
775 message: Commit message body
776 patch: Patch content
777 message_id: Message-ID header (if -m/--message-id was used)
778 """
780 author_name: str
781 author_email: str
782 author_date: str | None
783 subject: str
784 message: str
785 patch: str
786 message_id: str | None = None
789def _munge_subject(subject: str, keep_subject: bool, keep_non_patch: bool) -> str:
790 """Munge email subject line for commit message.
792 Args:
793 subject: Original subject line
794 keep_subject: If True, keep subject intact (-k option)
795 keep_non_patch: If True, only strip [PATCH] (-b option)
797 Returns:
798 Processed subject line
799 """
800 if keep_subject:
801 return subject
803 result = subject
805 # First remove Re: prefixes (they can appear before brackets)
806 while True:
807 new_result = re.sub(r"^\s*(?:re|RE|Re):\s*", "", result, flags=re.IGNORECASE)
808 if new_result == result:
809 break
810 result = new_result
812 # Remove bracketed strings
813 if keep_non_patch:
814 # Only remove brackets containing "PATCH"
815 # Match each bracket individually anywhere in the string
816 while True:
817 # Remove PATCH bracket, but be careful with whitespace
818 new_result = re.sub(
819 r"\[[^\]]*?PATCH[^\]]*?\](\s+)?", r"\1", result, flags=re.IGNORECASE
820 )
821 if new_result == result:
822 break
823 result = new_result
824 else:
825 # Remove all bracketed strings
826 while True:
827 new_result = re.sub(r"^\s*\[.*?\]\s*", "", result)
828 if new_result == result:
829 break
830 result = new_result
832 # Remove leading/trailing whitespace
833 result = result.strip()
835 # Normalize multiple whitespace to single space
836 result = re.sub(r"\s+", " ", result)
838 return result
841def _find_scissors_line(lines: list[bytes]) -> int | None:
842 """Find the scissors line in message body.
844 Args:
845 lines: List of lines in the message body
847 Returns:
848 Index of scissors line, or None if not found
849 """
850 scissors_pattern = re.compile(
851 rb"^(?:>?\s*-+\s*)?(?:8<|>8)?\s*-+\s*$|^(?:>?\s*-+\s*)(?:cut here|scissors)(?:\s*-+)?$",
852 re.IGNORECASE,
853 )
855 for i, line in enumerate(lines):
856 if scissors_pattern.match(line.strip()):
857 return i
859 return None
862def mailinfo(
863 msg: email.message.Message | BinaryIO | TextIO,
864 keep_subject: bool = False,
865 keep_non_patch: bool = False,
866 encoding: str | None = None,
867 scissors: bool = False,
868 message_id: bool = False,
869) -> MailinfoResult:
870 """Extract patch information from an email message.
872 This function parses an email message and extracts commit metadata
873 (author, email, subject) and separates the commit message from the
874 patch content, similar to git mailinfo.
876 Args:
877 msg: Email message (email.message.Message object) or file handle to read from
878 keep_subject: If True, keep subject intact without munging (-k)
879 keep_non_patch: If True, only strip [PATCH] from brackets (-b)
880 encoding: Character encoding to use (default: detect from message)
881 scissors: If True, remove everything before scissors line
882 message_id: If True, include Message-ID in commit message (-m)
884 Returns:
885 MailinfoResult with parsed information
887 Raises:
888 ValueError: If message is malformed or missing required fields
889 """
890 # Parse message if given a file handle
891 parsed_msg: email.message.Message
892 if not isinstance(msg, email.message.Message):
893 if hasattr(msg, "read"):
894 content = msg.read()
895 if isinstance(content, bytes):
896 bparser = email.parser.BytesParser()
897 parsed_msg = bparser.parsebytes(content)
898 else:
899 sparser = email.parser.Parser()
900 parsed_msg = sparser.parsestr(content)
901 else:
902 raise ValueError("msg must be an email.message.Message or file-like object")
903 else:
904 parsed_msg = msg
906 # Detect encoding from message if not specified
907 if encoding is None:
908 encoding = parsed_msg.get_content_charset() or "utf-8"
910 # Extract author information
911 from_header = parsed_msg.get("From", "")
912 if not from_header:
913 raise ValueError("Email message missing 'From' header")
915 # Parse "Name <email>" format
916 author_name, author_email = email.utils.parseaddr(from_header)
917 if not author_email:
918 raise ValueError(
919 f"Could not parse email address from 'From' header: {from_header}"
920 )
922 # Extract date
923 date_header = parsed_msg.get("Date")
924 author_date = date_header if date_header else None
926 # Extract and process subject
927 subject = parsed_msg.get("Subject", "")
928 if not subject:
929 subject = "(no subject)"
931 # Convert Header object to string if needed
932 subject = str(subject)
934 # Remove newlines from subject
935 subject = subject.replace("\n", " ").replace("\r", " ")
936 subject = _munge_subject(subject, keep_subject, keep_non_patch)
938 # Extract Message-ID if requested
939 msg_id = None
940 if message_id:
941 msg_id = parsed_msg.get("Message-ID")
943 # Get message body
944 body = parsed_msg.get_payload(decode=True)
945 if body is None:
946 body = b""
947 elif isinstance(body, str):
948 body = body.encode(encoding)
949 elif not isinstance(body, bytes):
950 # Handle multipart or other types
951 body = str(body).encode(encoding)
953 # Split into lines
954 lines = body.splitlines(keepends=True)
956 # Handle scissors
957 scissors_idx = None
958 if scissors:
959 scissors_idx = _find_scissors_line(lines)
960 if scissors_idx is not None:
961 # Remove everything up to and including scissors line
962 lines = lines[scissors_idx + 1 :]
964 # Separate commit message from patch
965 # Look for the "---" separator that indicates start of diffstat/patch
966 message_lines: list[bytes] = []
967 patch_lines: list[bytes] = []
968 in_patch = False
970 for line in lines:
971 if not in_patch and line == b"---\n":
972 in_patch = True
973 patch_lines.append(line)
974 elif in_patch:
975 # Stop at signature marker "-- "
976 if line == b"-- \n":
977 break
978 patch_lines.append(line)
979 else:
980 message_lines.append(line)
982 # Build commit message
983 commit_message = b"".join(message_lines).decode(encoding, errors="replace")
985 # Clean up commit message
986 commit_message = commit_message.strip()
988 # Append Message-ID if requested
989 if message_id and msg_id:
990 if commit_message:
991 commit_message += "\n\n"
992 commit_message += f"Message-ID: {msg_id}"
994 # Build patch content
995 patch_content = b"".join(patch_lines).decode(encoding, errors="replace")
997 return MailinfoResult(
998 author_name=author_name,
999 author_email=author_email,
1000 author_date=author_date,
1001 subject=subject,
1002 message=commit_message,
1003 patch=patch_content,
1004 message_id=msg_id,
1005 )