Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# patch.py -- For dealing with packed-style patches.
2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Classes for dealing with git am-style patches.
24These patches are basically unified diffs with some extra metadata tacked
25on.
26"""
28__all__ = [
29 "DEFAULT_DIFF_ALGORITHM",
30 "FIRST_FEW_BYTES",
31 "DiffAlgorithmNotAvailable",
32 "MailinfoResult",
33 "commit_patch_id",
34 "gen_diff_header",
35 "get_summary",
36 "git_am_patch_split",
37 "is_binary",
38 "mailinfo",
39 "parse_patch_message",
40 "patch_filename",
41 "patch_id",
42 "shortid",
43 "unified_diff",
44 "unified_diff_with_algorithm",
45 "write_blob_diff",
46 "write_commit_patch",
47 "write_object_diff",
48 "write_tree_diff",
49]
51import email.message
52import email.parser
53import email.utils
54import re
55import time
56from collections.abc import Generator, Sequence
57from dataclasses import dataclass
58from difflib import SequenceMatcher
59from typing import (
60 IO,
61 TYPE_CHECKING,
62 BinaryIO,
63 TextIO,
64)
66if TYPE_CHECKING:
67 from .object_store import BaseObjectStore
69from .objects import S_ISGITLINK, Blob, Commit, ObjectID, RawObjectID
71FIRST_FEW_BYTES = 8000
73DEFAULT_DIFF_ALGORITHM = "myers"
76class DiffAlgorithmNotAvailable(Exception):
77 """Raised when a requested diff algorithm is not available."""
79 def __init__(self, algorithm: str, install_hint: str = "") -> None:
80 """Initialize exception.
82 Args:
83 algorithm: Name of the unavailable algorithm
84 install_hint: Optional installation hint
85 """
86 self.algorithm = algorithm
87 self.install_hint = install_hint
88 if install_hint:
89 super().__init__(
90 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}"
91 )
92 else:
93 super().__init__(
94 f"Diff algorithm '{algorithm}' requested but not available."
95 )
98def write_commit_patch(
99 f: IO[bytes],
100 commit: "Commit",
101 contents: str | bytes,
102 progress: tuple[int, int],
103 version: str | None = None,
104 encoding: str | None = None,
105) -> None:
106 """Write a individual file patch.
108 Args:
109 f: File-like object to write to
110 commit: Commit object
111 contents: Contents of the patch
112 progress: tuple with current patch number and total.
113 version: Version string to include in patch header
114 encoding: Encoding to use for the patch
116 Returns:
117 tuple with filename and contents
118 """
119 encoding = encoding or getattr(f, "encoding", "ascii")
120 if encoding is None:
121 encoding = "ascii"
122 if isinstance(contents, str):
123 contents = contents.encode(encoding)
124 (num, total) = progress
125 f.write(
126 b"From "
127 + commit.id
128 + b" "
129 + time.ctime(commit.commit_time).encode(encoding)
130 + b"\n"
131 )
132 f.write(b"From: " + commit.author + b"\n")
133 f.write(
134 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n"
135 )
136 f.write(
137 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n"
138 )
139 f.write(b"\n")
140 f.write(b"---\n")
141 try:
142 import subprocess
144 p = subprocess.Popen(
145 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE
146 )
147 except (ImportError, OSError):
148 pass # diffstat not available?
149 else:
150 (diffstat, _) = p.communicate(contents)
151 f.write(diffstat)
152 f.write(b"\n")
153 f.write(contents)
154 f.write(b"-- \n")
155 if version is None:
156 from dulwich import __version__ as dulwich_version
158 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version)
159 else:
160 if encoding is None:
161 encoding = "ascii"
162 f.write(version.encode(encoding) + b"\n")
165def get_summary(commit: "Commit") -> str:
166 """Determine the summary line for use in a filename.
168 Args:
169 commit: Commit
170 Returns: Summary string
171 """
172 decoded = commit.message.decode(errors="replace")
173 lines = decoded.splitlines()
174 return lines[0].replace(" ", "-") if lines else ""
177# Unified Diff
178def _format_range_unified(start: int, stop: int) -> str:
179 """Convert range to the "ed" format."""
180 # Per the diff spec at http://www.unix.org/single_unix_specification/
181 beginning = start + 1 # lines start numbering with one
182 length = stop - start
183 if length == 1:
184 return f"{beginning}"
185 if not length:
186 beginning -= 1 # empty ranges begin at line just before the range
187 return f"{beginning},{length}"
190def unified_diff(
191 a: Sequence[bytes],
192 b: Sequence[bytes],
193 fromfile: bytes = b"",
194 tofile: bytes = b"",
195 fromfiledate: str = "",
196 tofiledate: str = "",
197 n: int = 3,
198 lineterm: str = "\n",
199 tree_encoding: str = "utf-8",
200 output_encoding: str = "utf-8",
201) -> Generator[bytes, None, None]:
202 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does.
204 Based on the same function in Python2.7 difflib.py
205 """
206 started = False
207 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n):
208 if not started:
209 started = True
210 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
211 todate = f"\t{tofiledate}" if tofiledate else ""
212 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
213 output_encoding
214 )
215 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
216 output_encoding
217 )
219 first, last = group[0], group[-1]
220 file1_range = _format_range_unified(first[1], last[2])
221 file2_range = _format_range_unified(first[3], last[4])
222 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
224 for tag, i1, i2, j1, j2 in group:
225 if tag == "equal":
226 for line in a[i1:i2]:
227 yield b" " + line
228 continue
229 if tag in ("replace", "delete"):
230 for line in a[i1:i2]:
231 if not line[-1:] == b"\n":
232 line += b"\n\\ No newline at end of file\n"
233 yield b"-" + line
234 if tag in ("replace", "insert"):
235 for line in b[j1:j2]:
236 if not line[-1:] == b"\n":
237 line += b"\n\\ No newline at end of file\n"
238 yield b"+" + line
241def _get_sequence_matcher(
242 algorithm: str, a: Sequence[bytes], b: Sequence[bytes]
243) -> SequenceMatcher[bytes]:
244 """Get appropriate sequence matcher for the given algorithm.
246 Args:
247 algorithm: Diff algorithm ("myers" or "patience")
248 a: First sequence
249 b: Second sequence
251 Returns:
252 Configured sequence matcher instance
254 Raises:
255 DiffAlgorithmNotAvailable: If patience requested but not available
256 """
257 if algorithm == "patience":
258 try:
259 from patiencediff import PatienceSequenceMatcher
261 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore]
262 except ImportError:
263 raise DiffAlgorithmNotAvailable(
264 "patience", "Install with: pip install 'dulwich[patiencediff]'"
265 )
266 else:
267 return SequenceMatcher(a=a, b=b)
270def unified_diff_with_algorithm(
271 a: Sequence[bytes],
272 b: Sequence[bytes],
273 fromfile: bytes = b"",
274 tofile: bytes = b"",
275 fromfiledate: str = "",
276 tofiledate: str = "",
277 n: int = 3,
278 lineterm: str = "\n",
279 tree_encoding: str = "utf-8",
280 output_encoding: str = "utf-8",
281 algorithm: str | None = None,
282) -> Generator[bytes, None, None]:
283 """Generate unified diff with specified algorithm.
285 Args:
286 a: First sequence of lines
287 b: Second sequence of lines
288 fromfile: Name of first file
289 tofile: Name of second file
290 fromfiledate: Date of first file
291 tofiledate: Date of second file
292 n: Number of context lines
293 lineterm: Line terminator
294 tree_encoding: Encoding for tree paths
295 output_encoding: Encoding for output
296 algorithm: Diff algorithm to use ("myers" or "patience")
298 Returns:
299 Generator yielding diff lines
301 Raises:
302 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available
303 """
304 if algorithm is None:
305 algorithm = DEFAULT_DIFF_ALGORITHM
307 matcher = _get_sequence_matcher(algorithm, a, b)
309 started = False
310 for group in matcher.get_grouped_opcodes(n):
311 if not started:
312 started = True
313 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
314 todate = f"\t{tofiledate}" if tofiledate else ""
315 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
316 output_encoding
317 )
318 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
319 output_encoding
320 )
322 first, last = group[0], group[-1]
323 file1_range = _format_range_unified(first[1], last[2])
324 file2_range = _format_range_unified(first[3], last[4])
325 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
327 for tag, i1, i2, j1, j2 in group:
328 if tag == "equal":
329 for line in a[i1:i2]:
330 yield b" " + line
331 continue
332 if tag in ("replace", "delete"):
333 for line in a[i1:i2]:
334 if not line[-1:] == b"\n":
335 line += b"\n\\ No newline at end of file\n"
336 yield b"-" + line
337 if tag in ("replace", "insert"):
338 for line in b[j1:j2]:
339 if not line[-1:] == b"\n":
340 line += b"\n\\ No newline at end of file\n"
341 yield b"+" + line
344def is_binary(content: bytes) -> bool:
345 """See if the first few bytes contain any null characters.
347 Args:
348 content: Bytestring to check for binary content
349 """
350 return b"\0" in content[:FIRST_FEW_BYTES]
353def shortid(hexsha: bytes | None) -> bytes:
354 """Get short object ID.
356 Args:
357 hexsha: Full hex SHA or None
359 Returns:
360 7-character short ID
361 """
362 if hexsha is None:
363 return b"0" * 7
364 else:
365 return hexsha[:7]
368def patch_filename(p: bytes | None, root: bytes) -> bytes:
369 """Generate patch filename.
371 Args:
372 p: Path or None
373 root: Root directory
375 Returns:
376 Full patch filename
377 """
378 if p is None:
379 return b"/dev/null"
380 else:
381 return root + b"/" + p
384def write_object_diff(
385 f: IO[bytes],
386 store: "BaseObjectStore",
387 old_file: tuple[bytes | None, int | None, ObjectID | None],
388 new_file: tuple[bytes | None, int | None, ObjectID | None],
389 diff_binary: bool = False,
390 diff_algorithm: str | None = None,
391) -> None:
392 """Write the diff for an object.
394 Args:
395 f: File-like object to write to
396 store: Store to retrieve objects from, if necessary
397 old_file: (path, mode, hexsha) tuple
398 new_file: (path, mode, hexsha) tuple
399 diff_binary: Whether to diff files even if they
400 are considered binary files by is_binary().
401 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
403 Note: the tuple elements should be None for nonexistent files
404 """
405 (old_path, old_mode, old_id) = old_file
406 (new_path, new_mode, new_id) = new_file
407 patched_old_path = patch_filename(old_path, b"a")
408 patched_new_path = patch_filename(new_path, b"b")
410 def content(mode: int | None, hexsha: ObjectID | None) -> Blob:
411 """Get blob content for a file.
413 Args:
414 mode: File mode
415 hexsha: Object SHA
417 Returns:
418 Blob object
419 """
420 if hexsha is None:
421 return Blob.from_string(b"")
422 elif mode is not None and S_ISGITLINK(mode):
423 return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
424 else:
425 obj = store[hexsha]
426 if isinstance(obj, Blob):
427 return obj
428 else:
429 # Fallback for non-blob objects
430 return Blob.from_string(obj.as_raw_string())
432 def lines(content: "Blob") -> list[bytes]:
433 """Split blob content into lines.
435 Args:
436 content: Blob content
438 Returns:
439 List of lines
440 """
441 if not content:
442 return []
443 else:
444 return content.splitlines()
446 f.writelines(
447 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id))
448 )
449 old_content = content(old_mode, old_id)
450 new_content = content(new_mode, new_id)
451 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)):
452 binary_diff = (
453 b"Binary files "
454 + patched_old_path
455 + b" and "
456 + patched_new_path
457 + b" differ\n"
458 )
459 f.write(binary_diff)
460 else:
461 f.writelines(
462 unified_diff_with_algorithm(
463 lines(old_content),
464 lines(new_content),
465 patched_old_path,
466 patched_new_path,
467 algorithm=diff_algorithm,
468 )
469 )
472# TODO(jelmer): Support writing unicode, rather than bytes.
473def gen_diff_header(
474 paths: tuple[bytes | None, bytes | None],
475 modes: tuple[int | None, int | None],
476 shas: tuple[bytes | None, bytes | None],
477) -> Generator[bytes, None, None]:
478 """Write a blob diff header.
480 Args:
481 paths: Tuple with old and new path
482 modes: Tuple with old and new modes
483 shas: Tuple with old and new shas
484 """
485 (old_path, new_path) = paths
486 (old_mode, new_mode) = modes
487 (old_sha, new_sha) = shas
488 if old_path is None and new_path is not None:
489 old_path = new_path
490 if new_path is None and old_path is not None:
491 new_path = old_path
492 old_path = patch_filename(old_path, b"a")
493 new_path = patch_filename(new_path, b"b")
494 yield b"diff --git " + old_path + b" " + new_path + b"\n"
496 if old_mode != new_mode:
497 if new_mode is not None:
498 if old_mode is not None:
499 yield (f"old file mode {old_mode:o}\n").encode("ascii")
500 yield (f"new file mode {new_mode:o}\n").encode("ascii")
501 else:
502 yield (f"deleted file mode {old_mode:o}\n").encode("ascii")
503 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha)
504 if new_mode is not None and old_mode is not None:
505 yield (f" {new_mode:o}").encode("ascii")
506 yield b"\n"
509# TODO(jelmer): Support writing unicode, rather than bytes.
510def write_blob_diff(
511 f: IO[bytes],
512 old_file: tuple[bytes | None, int | None, "Blob | None"],
513 new_file: tuple[bytes | None, int | None, "Blob | None"],
514 diff_algorithm: str | None = None,
515) -> None:
516 """Write blob diff.
518 Args:
519 f: File-like object to write to
520 old_file: (path, mode, hexsha) tuple (None if nonexisting)
521 new_file: (path, mode, hexsha) tuple (None if nonexisting)
522 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
524 Note: The use of write_object_diff is recommended over this function.
525 """
526 (old_path, old_mode, old_blob) = old_file
527 (new_path, new_mode, new_blob) = new_file
528 patched_old_path = patch_filename(old_path, b"a")
529 patched_new_path = patch_filename(new_path, b"b")
531 def lines(blob: "Blob | None") -> list[bytes]:
532 """Split blob content into lines.
534 Args:
535 blob: Blob object or None
537 Returns:
538 List of lines
539 """
540 if blob is not None:
541 return blob.splitlines()
542 else:
543 return []
545 f.writelines(
546 gen_diff_header(
547 (old_path, new_path),
548 (old_mode, new_mode),
549 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)),
550 )
551 )
552 old_contents = lines(old_blob)
553 new_contents = lines(new_blob)
554 f.writelines(
555 unified_diff_with_algorithm(
556 old_contents,
557 new_contents,
558 patched_old_path,
559 patched_new_path,
560 algorithm=diff_algorithm,
561 )
562 )
565def write_tree_diff(
566 f: IO[bytes],
567 store: "BaseObjectStore",
568 old_tree: ObjectID | None,
569 new_tree: ObjectID | None,
570 diff_binary: bool = False,
571 diff_algorithm: str | None = None,
572) -> None:
573 """Write tree diff.
575 Args:
576 f: File-like object to write to.
577 store: Object store to read from
578 old_tree: Old tree id
579 new_tree: New tree id
580 diff_binary: Whether to diff files even if they
581 are considered binary files by is_binary().
582 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
583 """
584 changes = store.tree_changes(old_tree, new_tree)
585 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
586 write_object_diff(
587 f,
588 store,
589 (oldpath, oldmode, oldsha),
590 (newpath, newmode, newsha),
591 diff_binary=diff_binary,
592 diff_algorithm=diff_algorithm,
593 )
596def git_am_patch_split(
597 f: TextIO | BinaryIO, encoding: str | None = None
598) -> tuple["Commit", bytes, bytes | None]:
599 """Parse a git-am-style patch and split it up into bits.
601 Args:
602 f: File-like object to parse
603 encoding: Encoding to use when creating Git objects
604 Returns: Tuple with commit object, diff contents and git version
605 """
606 encoding = encoding or getattr(f, "encoding", "ascii")
607 encoding = encoding or "ascii"
608 contents = f.read()
609 if isinstance(contents, bytes):
610 bparser = email.parser.BytesParser()
611 msg = bparser.parsebytes(contents)
612 else:
613 uparser = email.parser.Parser()
614 msg = uparser.parsestr(contents)
615 return parse_patch_message(msg, encoding)
618def parse_patch_message(
619 msg: email.message.Message, encoding: str | None = None
620) -> tuple["Commit", bytes, bytes | None]:
621 """Extract a Commit object and patch from an e-mail message.
623 Args:
624 msg: An email message (email.message.Message)
625 encoding: Encoding to use to encode Git commits
626 Returns: Tuple with commit object, diff contents and git version
627 """
628 c = Commit()
629 if encoding is None:
630 encoding = "ascii"
631 c.author = msg["from"].encode(encoding)
632 c.committer = msg["from"].encode(encoding)
633 try:
634 patch_tag_start = msg["subject"].index("[PATCH")
635 except ValueError:
636 subject = msg["subject"]
637 else:
638 close = msg["subject"].index("] ", patch_tag_start)
639 subject = msg["subject"][close + 2 :]
640 c.message = (subject.replace("\n", "") + "\n").encode(encoding)
641 first = True
643 body = msg.get_payload(decode=True)
644 if isinstance(body, str):
645 body = body.encode(encoding)
646 if isinstance(body, bytes):
647 lines = body.splitlines(True)
648 else:
649 # Handle other types by converting to string first
650 lines = str(body).encode(encoding).splitlines(True)
651 line_iter = iter(lines)
653 for line in line_iter:
654 if line == b"---\n":
655 break
656 if first:
657 if line.startswith(b"From: "):
658 c.author = line[len(b"From: ") :].rstrip()
659 else:
660 c.message += b"\n" + line
661 first = False
662 else:
663 c.message += line
664 diff = b""
665 for line in line_iter:
666 if line == b"-- \n":
667 break
668 diff += line
669 try:
670 version = next(line_iter).rstrip(b"\n")
671 except StopIteration:
672 version = None
673 return c, diff, version
676def patch_id(diff_data: bytes) -> bytes:
677 """Compute patch ID for a diff.
679 The patch ID is computed by normalizing the diff and computing a SHA1 hash.
680 This follows git's patch-id algorithm which:
681 1. Removes whitespace from lines starting with + or -
682 2. Replaces line numbers in @@ headers with a canonical form
683 3. Computes SHA1 of the result
685 Args:
686 diff_data: Raw diff data as bytes
688 Returns:
689 SHA1 hash of normalized diff (40-byte hex string)
691 TODO: This implementation uses a simple line-by-line approach. For better
692 compatibility with git's patch-id, consider using proper patch parsing that:
693 - Handles edge cases in diff format (binary diffs, mode changes, etc.)
694 - Properly parses unified diff format according to the spec
695 - Matches git's exact normalization algorithm byte-for-byte
696 See git's patch-id.c for reference implementation.
697 """
698 import hashlib
699 import re
701 # Normalize the diff for patch-id computation
702 normalized_lines = []
704 for line in diff_data.split(b"\n"):
705 # Skip diff headers (diff --git, index, ---, +++)
706 if line.startswith(
707 (
708 b"diff --git ",
709 b"index ",
710 b"--- ",
711 b"+++ ",
712 b"new file mode ",
713 b"old file mode ",
714 b"deleted file mode ",
715 b"new mode ",
716 b"old mode ",
717 b"similarity index ",
718 b"dissimilarity index ",
719 b"rename from ",
720 b"rename to ",
721 b"copy from ",
722 b"copy to ",
723 )
724 ):
725 continue
727 # Normalize @@ headers to a canonical form
728 if line.startswith(b"@@"):
729 # Replace line numbers with canonical form
730 match = re.match(rb"^@@\s+-\d+(?:,\d+)?\s+\+\d+(?:,\d+)?\s+@@", line)
731 if match:
732 # Use canonical hunk header without line numbers
733 normalized_lines.append(b"@@")
734 continue
736 # For +/- lines, strip all whitespace
737 if line.startswith((b"+", b"-")):
738 # Keep the +/- prefix but remove all whitespace from the rest
739 if len(line) > 1:
740 # Remove all whitespace from the content
741 content = line[1:].replace(b" ", b"").replace(b"\t", b"")
742 normalized_lines.append(line[:1] + content)
743 else:
744 # Just +/- alone
745 normalized_lines.append(line[:1])
746 continue
748 # Keep context lines and other content as-is
749 if line.startswith(b" ") or line == b"":
750 normalized_lines.append(line)
752 # Join normalized lines and compute SHA1
753 normalized = b"\n".join(normalized_lines)
754 return hashlib.sha1(normalized).hexdigest().encode("ascii")
757def commit_patch_id(
758 store: "BaseObjectStore", commit_id: ObjectID | RawObjectID
759) -> bytes:
760 """Compute patch ID for a commit.
762 Args:
763 store: Object store to read objects from
764 commit_id: Commit ID (40-byte hex string)
766 Returns:
767 Patch ID (40-byte hex string)
768 """
769 from io import BytesIO
771 commit = store[commit_id]
772 assert isinstance(commit, Commit)
774 # Get the parent tree (or empty tree for root commit)
775 if commit.parents:
776 parent = store[commit.parents[0]]
777 assert isinstance(parent, Commit)
778 parent_tree = parent.tree
779 else:
780 # Root commit - compare against empty tree
781 parent_tree = None
783 # Generate diff
784 diff_output = BytesIO()
785 write_tree_diff(diff_output, store, parent_tree, commit.tree)
787 return patch_id(diff_output.getvalue())
790@dataclass
791class MailinfoResult:
792 """Result of mailinfo parsing.
794 Attributes:
795 author_name: Author's name
796 author_email: Author's email address
797 author_date: Author's date (if present in the email)
798 subject: Processed subject line
799 message: Commit message body
800 patch: Patch content
801 message_id: Message-ID header (if -m/--message-id was used)
802 """
804 author_name: str
805 author_email: str
806 author_date: str | None
807 subject: str
808 message: str
809 patch: str
810 message_id: str | None = None
813def _munge_subject(subject: str, keep_subject: bool, keep_non_patch: bool) -> str:
814 """Munge email subject line for commit message.
816 Args:
817 subject: Original subject line
818 keep_subject: If True, keep subject intact (-k option)
819 keep_non_patch: If True, only strip [PATCH] (-b option)
821 Returns:
822 Processed subject line
823 """
824 if keep_subject:
825 return subject
827 result = subject
829 # First remove Re: prefixes (they can appear before brackets)
830 while True:
831 new_result = re.sub(r"^\s*(?:re|RE|Re):\s*", "", result, flags=re.IGNORECASE)
832 if new_result == result:
833 break
834 result = new_result
836 # Remove bracketed strings
837 if keep_non_patch:
838 # Only remove brackets containing "PATCH"
839 # Match each bracket individually anywhere in the string
840 while True:
841 # Remove PATCH bracket, but be careful with whitespace
842 new_result = re.sub(
843 r"\[[^\]]*?PATCH[^\]]*?\](\s+)?", r"\1", result, flags=re.IGNORECASE
844 )
845 if new_result == result:
846 break
847 result = new_result
848 else:
849 # Remove all bracketed strings
850 while True:
851 new_result = re.sub(r"^\s*\[.*?\]\s*", "", result)
852 if new_result == result:
853 break
854 result = new_result
856 # Remove leading/trailing whitespace
857 result = result.strip()
859 # Normalize multiple whitespace to single space
860 result = re.sub(r"\s+", " ", result)
862 return result
865def _find_scissors_line(lines: list[bytes]) -> int | None:
866 """Find the scissors line in message body.
868 Args:
869 lines: List of lines in the message body
871 Returns:
872 Index of scissors line, or None if not found
873 """
874 scissors_pattern = re.compile(
875 rb"^(?:>?\s*-+\s*)?(?:8<|>8)?\s*-+\s*$|^(?:>?\s*-+\s*)(?:cut here|scissors)(?:\s*-+)?$",
876 re.IGNORECASE,
877 )
879 for i, line in enumerate(lines):
880 if scissors_pattern.match(line.strip()):
881 return i
883 return None
886def mailinfo(
887 msg: email.message.Message | BinaryIO | TextIO,
888 keep_subject: bool = False,
889 keep_non_patch: bool = False,
890 encoding: str | None = None,
891 scissors: bool = False,
892 message_id: bool = False,
893) -> MailinfoResult:
894 """Extract patch information from an email message.
896 This function parses an email message and extracts commit metadata
897 (author, email, subject) and separates the commit message from the
898 patch content, similar to git mailinfo.
900 Args:
901 msg: Email message (email.message.Message object) or file handle to read from
902 keep_subject: If True, keep subject intact without munging (-k)
903 keep_non_patch: If True, only strip [PATCH] from brackets (-b)
904 encoding: Character encoding to use (default: detect from message)
905 scissors: If True, remove everything before scissors line
906 message_id: If True, include Message-ID in commit message (-m)
908 Returns:
909 MailinfoResult with parsed information
911 Raises:
912 ValueError: If message is malformed or missing required fields
913 """
914 # Parse message if given a file handle
915 parsed_msg: email.message.Message
916 if not isinstance(msg, email.message.Message):
917 if hasattr(msg, "read"):
918 content = msg.read()
919 if isinstance(content, bytes):
920 bparser = email.parser.BytesParser()
921 parsed_msg = bparser.parsebytes(content)
922 else:
923 sparser = email.parser.Parser()
924 parsed_msg = sparser.parsestr(content)
925 else:
926 raise ValueError("msg must be an email.message.Message or file-like object")
927 else:
928 parsed_msg = msg
930 # Detect encoding from message if not specified
931 if encoding is None:
932 encoding = parsed_msg.get_content_charset() or "utf-8"
934 # Extract author information
935 from_header = parsed_msg.get("From", "")
936 if not from_header:
937 raise ValueError("Email message missing 'From' header")
939 # Parse "Name <email>" format
940 author_name, author_email = email.utils.parseaddr(from_header)
941 if not author_email:
942 raise ValueError(
943 f"Could not parse email address from 'From' header: {from_header}"
944 )
946 # Extract date
947 date_header = parsed_msg.get("Date")
948 author_date = date_header if date_header else None
950 # Extract and process subject
951 subject = parsed_msg.get("Subject", "")
952 if not subject:
953 subject = "(no subject)"
955 # Convert Header object to string if needed
956 subject = str(subject)
958 # Remove newlines from subject
959 subject = subject.replace("\n", " ").replace("\r", " ")
960 subject = _munge_subject(subject, keep_subject, keep_non_patch)
962 # Extract Message-ID if requested
963 msg_id = None
964 if message_id:
965 msg_id = parsed_msg.get("Message-ID")
967 # Get message body
968 body = parsed_msg.get_payload(decode=True)
969 if body is None:
970 body = b""
971 elif isinstance(body, str):
972 body = body.encode(encoding)
973 elif not isinstance(body, bytes):
974 # Handle multipart or other types
975 body = str(body).encode(encoding)
977 # Split into lines
978 lines = body.splitlines(keepends=True)
980 # Handle scissors
981 scissors_idx = None
982 if scissors:
983 scissors_idx = _find_scissors_line(lines)
984 if scissors_idx is not None:
985 # Remove everything up to and including scissors line
986 lines = lines[scissors_idx + 1 :]
988 # Separate commit message from patch
989 # Look for the "---" separator that indicates start of diffstat/patch
990 message_lines: list[bytes] = []
991 patch_lines: list[bytes] = []
992 in_patch = False
994 for line in lines:
995 if not in_patch and line == b"---\n":
996 in_patch = True
997 patch_lines.append(line)
998 elif in_patch:
999 # Stop at signature marker "-- "
1000 if line == b"-- \n":
1001 break
1002 patch_lines.append(line)
1003 else:
1004 message_lines.append(line)
1006 # Build commit message
1007 commit_message = b"".join(message_lines).decode(encoding, errors="replace")
1009 # Clean up commit message
1010 commit_message = commit_message.strip()
1012 # Append Message-ID if requested
1013 if message_id and msg_id:
1014 if commit_message:
1015 commit_message += "\n\n"
1016 commit_message += f"Message-ID: {msg_id}"
1018 # Build patch content
1019 patch_content = b"".join(patch_lines).decode(encoding, errors="replace")
1021 return MailinfoResult(
1022 author_name=author_name,
1023 author_email=author_email,
1024 author_date=author_date,
1025 subject=subject,
1026 message=commit_message,
1027 patch=patch_content,
1028 message_id=msg_id,
1029 )