Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# patch.py -- For dealing with packed-style patches.
2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Classes for dealing with git am-style patches.
24These patches are basically unified diffs with some extra metadata tacked
25on.
26"""
28import email.parser
29import time
30from collections.abc import Generator, Sequence
31from difflib import SequenceMatcher
32from typing import (
33 IO,
34 TYPE_CHECKING,
35 BinaryIO,
36 Optional,
37 TextIO,
38 Union,
39)
41if TYPE_CHECKING:
42 import email.message
44 from .object_store import BaseObjectStore
46from .objects import S_ISGITLINK, Blob, Commit
48FIRST_FEW_BYTES = 8000
50DEFAULT_DIFF_ALGORITHM = "myers"
53class DiffAlgorithmNotAvailable(Exception):
54 """Raised when a requested diff algorithm is not available."""
56 def __init__(self, algorithm: str, install_hint: str = "") -> None:
57 """Initialize exception.
59 Args:
60 algorithm: Name of the unavailable algorithm
61 install_hint: Optional installation hint
62 """
63 self.algorithm = algorithm
64 self.install_hint = install_hint
65 if install_hint:
66 super().__init__(
67 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}"
68 )
69 else:
70 super().__init__(
71 f"Diff algorithm '{algorithm}' requested but not available."
72 )
75def write_commit_patch(
76 f: IO[bytes],
77 commit: "Commit",
78 contents: Union[str, bytes],
79 progress: tuple[int, int],
80 version: Optional[str] = None,
81 encoding: Optional[str] = None,
82) -> None:
83 """Write a individual file patch.
85 Args:
86 f: File-like object to write to
87 commit: Commit object
88 contents: Contents of the patch
89 progress: tuple with current patch number and total.
90 version: Version string to include in patch header
91 encoding: Encoding to use for the patch
93 Returns:
94 tuple with filename and contents
95 """
96 encoding = encoding or getattr(f, "encoding", "ascii")
97 if encoding is None:
98 encoding = "ascii"
99 if isinstance(contents, str):
100 contents = contents.encode(encoding)
101 (num, total) = progress
102 f.write(
103 b"From "
104 + commit.id
105 + b" "
106 + time.ctime(commit.commit_time).encode(encoding)
107 + b"\n"
108 )
109 f.write(b"From: " + commit.author + b"\n")
110 f.write(
111 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n"
112 )
113 f.write(
114 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n"
115 )
116 f.write(b"\n")
117 f.write(b"---\n")
118 try:
119 import subprocess
121 p = subprocess.Popen(
122 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE
123 )
124 except (ImportError, OSError):
125 pass # diffstat not available?
126 else:
127 (diffstat, _) = p.communicate(contents)
128 f.write(diffstat)
129 f.write(b"\n")
130 f.write(contents)
131 f.write(b"-- \n")
132 if version is None:
133 from dulwich import __version__ as dulwich_version
135 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version)
136 else:
137 if encoding is None:
138 encoding = "ascii"
139 f.write(version.encode(encoding) + b"\n")
142def get_summary(commit: "Commit") -> str:
143 """Determine the summary line for use in a filename.
145 Args:
146 commit: Commit
147 Returns: Summary string
148 """
149 decoded = commit.message.decode(errors="replace")
150 lines = decoded.splitlines()
151 return lines[0].replace(" ", "-") if lines else ""
154# Unified Diff
155def _format_range_unified(start: int, stop: int) -> str:
156 """Convert range to the "ed" format."""
157 # Per the diff spec at http://www.unix.org/single_unix_specification/
158 beginning = start + 1 # lines start numbering with one
159 length = stop - start
160 if length == 1:
161 return f"{beginning}"
162 if not length:
163 beginning -= 1 # empty ranges begin at line just before the range
164 return f"{beginning},{length}"
167def unified_diff(
168 a: Sequence[bytes],
169 b: Sequence[bytes],
170 fromfile: bytes = b"",
171 tofile: bytes = b"",
172 fromfiledate: str = "",
173 tofiledate: str = "",
174 n: int = 3,
175 lineterm: str = "\n",
176 tree_encoding: str = "utf-8",
177 output_encoding: str = "utf-8",
178) -> Generator[bytes, None, None]:
179 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does.
181 Based on the same function in Python2.7 difflib.py
182 """
183 started = False
184 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n):
185 if not started:
186 started = True
187 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
188 todate = f"\t{tofiledate}" if tofiledate else ""
189 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
190 output_encoding
191 )
192 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
193 output_encoding
194 )
196 first, last = group[0], group[-1]
197 file1_range = _format_range_unified(first[1], last[2])
198 file2_range = _format_range_unified(first[3], last[4])
199 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
201 for tag, i1, i2, j1, j2 in group:
202 if tag == "equal":
203 for line in a[i1:i2]:
204 yield b" " + line
205 continue
206 if tag in ("replace", "delete"):
207 for line in a[i1:i2]:
208 if not line[-1:] == b"\n":
209 line += b"\n\\ No newline at end of file\n"
210 yield b"-" + line
211 if tag in ("replace", "insert"):
212 for line in b[j1:j2]:
213 if not line[-1:] == b"\n":
214 line += b"\n\\ No newline at end of file\n"
215 yield b"+" + line
218def _get_sequence_matcher(
219 algorithm: str, a: Sequence[bytes], b: Sequence[bytes]
220) -> SequenceMatcher[bytes]:
221 """Get appropriate sequence matcher for the given algorithm.
223 Args:
224 algorithm: Diff algorithm ("myers" or "patience")
225 a: First sequence
226 b: Second sequence
228 Returns:
229 Configured sequence matcher instance
231 Raises:
232 DiffAlgorithmNotAvailable: If patience requested but not available
233 """
234 if algorithm == "patience":
235 try:
236 from patiencediff import PatienceSequenceMatcher
238 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore]
239 except ImportError:
240 raise DiffAlgorithmNotAvailable(
241 "patience", "Install with: pip install 'dulwich[patiencediff]'"
242 )
243 else:
244 return SequenceMatcher(a=a, b=b)
247def unified_diff_with_algorithm(
248 a: Sequence[bytes],
249 b: Sequence[bytes],
250 fromfile: bytes = b"",
251 tofile: bytes = b"",
252 fromfiledate: str = "",
253 tofiledate: str = "",
254 n: int = 3,
255 lineterm: str = "\n",
256 tree_encoding: str = "utf-8",
257 output_encoding: str = "utf-8",
258 algorithm: Optional[str] = None,
259) -> Generator[bytes, None, None]:
260 """Generate unified diff with specified algorithm.
262 Args:
263 a: First sequence of lines
264 b: Second sequence of lines
265 fromfile: Name of first file
266 tofile: Name of second file
267 fromfiledate: Date of first file
268 tofiledate: Date of second file
269 n: Number of context lines
270 lineterm: Line terminator
271 tree_encoding: Encoding for tree paths
272 output_encoding: Encoding for output
273 algorithm: Diff algorithm to use ("myers" or "patience")
275 Returns:
276 Generator yielding diff lines
278 Raises:
279 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available
280 """
281 if algorithm is None:
282 algorithm = DEFAULT_DIFF_ALGORITHM
284 matcher = _get_sequence_matcher(algorithm, a, b)
286 started = False
287 for group in matcher.get_grouped_opcodes(n):
288 if not started:
289 started = True
290 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
291 todate = f"\t{tofiledate}" if tofiledate else ""
292 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
293 output_encoding
294 )
295 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
296 output_encoding
297 )
299 first, last = group[0], group[-1]
300 file1_range = _format_range_unified(first[1], last[2])
301 file2_range = _format_range_unified(first[3], last[4])
302 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
304 for tag, i1, i2, j1, j2 in group:
305 if tag == "equal":
306 for line in a[i1:i2]:
307 yield b" " + line
308 continue
309 if tag in ("replace", "delete"):
310 for line in a[i1:i2]:
311 if not line[-1:] == b"\n":
312 line += b"\n\\ No newline at end of file\n"
313 yield b"-" + line
314 if tag in ("replace", "insert"):
315 for line in b[j1:j2]:
316 if not line[-1:] == b"\n":
317 line += b"\n\\ No newline at end of file\n"
318 yield b"+" + line
321def is_binary(content: bytes) -> bool:
322 """See if the first few bytes contain any null characters.
324 Args:
325 content: Bytestring to check for binary content
326 """
327 return b"\0" in content[:FIRST_FEW_BYTES]
330def shortid(hexsha: Optional[bytes]) -> bytes:
331 """Get short object ID.
333 Args:
334 hexsha: Full hex SHA or None
336 Returns:
337 7-character short ID
338 """
339 if hexsha is None:
340 return b"0" * 7
341 else:
342 return hexsha[:7]
345def patch_filename(p: Optional[bytes], root: bytes) -> bytes:
346 """Generate patch filename.
348 Args:
349 p: Path or None
350 root: Root directory
352 Returns:
353 Full patch filename
354 """
355 if p is None:
356 return b"/dev/null"
357 else:
358 return root + b"/" + p
361def write_object_diff(
362 f: IO[bytes],
363 store: "BaseObjectStore",
364 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
365 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
366 diff_binary: bool = False,
367 diff_algorithm: Optional[str] = None,
368) -> None:
369 """Write the diff for an object.
371 Args:
372 f: File-like object to write to
373 store: Store to retrieve objects from, if necessary
374 old_file: (path, mode, hexsha) tuple
375 new_file: (path, mode, hexsha) tuple
376 diff_binary: Whether to diff files even if they
377 are considered binary files by is_binary().
378 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
380 Note: the tuple elements should be None for nonexistent files
381 """
382 (old_path, old_mode, old_id) = old_file
383 (new_path, new_mode, new_id) = new_file
384 patched_old_path = patch_filename(old_path, b"a")
385 patched_new_path = patch_filename(new_path, b"b")
387 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob:
388 """Get blob content for a file.
390 Args:
391 mode: File mode
392 hexsha: Object SHA
394 Returns:
395 Blob object
396 """
397 if hexsha is None:
398 return Blob.from_string(b"")
399 elif mode is not None and S_ISGITLINK(mode):
400 return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
401 else:
402 obj = store[hexsha]
403 if isinstance(obj, Blob):
404 return obj
405 else:
406 # Fallback for non-blob objects
407 return Blob.from_string(obj.as_raw_string())
409 def lines(content: "Blob") -> list[bytes]:
410 """Split blob content into lines.
412 Args:
413 content: Blob content
415 Returns:
416 List of lines
417 """
418 if not content:
419 return []
420 else:
421 return content.splitlines()
423 f.writelines(
424 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id))
425 )
426 old_content = content(old_mode, old_id)
427 new_content = content(new_mode, new_id)
428 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)):
429 binary_diff = (
430 b"Binary files "
431 + patched_old_path
432 + b" and "
433 + patched_new_path
434 + b" differ\n"
435 )
436 f.write(binary_diff)
437 else:
438 f.writelines(
439 unified_diff_with_algorithm(
440 lines(old_content),
441 lines(new_content),
442 patched_old_path,
443 patched_new_path,
444 algorithm=diff_algorithm,
445 )
446 )
449# TODO(jelmer): Support writing unicode, rather than bytes.
450def gen_diff_header(
451 paths: tuple[Optional[bytes], Optional[bytes]],
452 modes: tuple[Optional[int], Optional[int]],
453 shas: tuple[Optional[bytes], Optional[bytes]],
454) -> Generator[bytes, None, None]:
455 """Write a blob diff header.
457 Args:
458 paths: Tuple with old and new path
459 modes: Tuple with old and new modes
460 shas: Tuple with old and new shas
461 """
462 (old_path, new_path) = paths
463 (old_mode, new_mode) = modes
464 (old_sha, new_sha) = shas
465 if old_path is None and new_path is not None:
466 old_path = new_path
467 if new_path is None and old_path is not None:
468 new_path = old_path
469 old_path = patch_filename(old_path, b"a")
470 new_path = patch_filename(new_path, b"b")
471 yield b"diff --git " + old_path + b" " + new_path + b"\n"
473 if old_mode != new_mode:
474 if new_mode is not None:
475 if old_mode is not None:
476 yield (f"old file mode {old_mode:o}\n").encode("ascii")
477 yield (f"new file mode {new_mode:o}\n").encode("ascii")
478 else:
479 yield (f"deleted file mode {old_mode:o}\n").encode("ascii")
480 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha)
481 if new_mode is not None and old_mode is not None:
482 yield (f" {new_mode:o}").encode("ascii")
483 yield b"\n"
486# TODO(jelmer): Support writing unicode, rather than bytes.
487def write_blob_diff(
488 f: IO[bytes],
489 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
490 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
491 diff_algorithm: Optional[str] = None,
492) -> None:
493 """Write blob diff.
495 Args:
496 f: File-like object to write to
497 old_file: (path, mode, hexsha) tuple (None if nonexisting)
498 new_file: (path, mode, hexsha) tuple (None if nonexisting)
499 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
501 Note: The use of write_object_diff is recommended over this function.
502 """
503 (old_path, old_mode, old_blob) = old_file
504 (new_path, new_mode, new_blob) = new_file
505 patched_old_path = patch_filename(old_path, b"a")
506 patched_new_path = patch_filename(new_path, b"b")
508 def lines(blob: Optional["Blob"]) -> list[bytes]:
509 """Split blob content into lines.
511 Args:
512 blob: Blob object or None
514 Returns:
515 List of lines
516 """
517 if blob is not None:
518 return blob.splitlines()
519 else:
520 return []
522 f.writelines(
523 gen_diff_header(
524 (old_path, new_path),
525 (old_mode, new_mode),
526 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)),
527 )
528 )
529 old_contents = lines(old_blob)
530 new_contents = lines(new_blob)
531 f.writelines(
532 unified_diff_with_algorithm(
533 old_contents,
534 new_contents,
535 patched_old_path,
536 patched_new_path,
537 algorithm=diff_algorithm,
538 )
539 )
542def write_tree_diff(
543 f: IO[bytes],
544 store: "BaseObjectStore",
545 old_tree: Optional[bytes],
546 new_tree: Optional[bytes],
547 diff_binary: bool = False,
548 diff_algorithm: Optional[str] = None,
549) -> None:
550 """Write tree diff.
552 Args:
553 f: File-like object to write to.
554 store: Object store to read from
555 old_tree: Old tree id
556 new_tree: New tree id
557 diff_binary: Whether to diff files even if they
558 are considered binary files by is_binary().
559 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
560 """
561 changes = store.tree_changes(old_tree, new_tree)
562 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
563 write_object_diff(
564 f,
565 store,
566 (oldpath, oldmode, oldsha),
567 (newpath, newmode, newsha),
568 diff_binary=diff_binary,
569 diff_algorithm=diff_algorithm,
570 )
573def git_am_patch_split(
574 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None
575) -> tuple["Commit", bytes, Optional[bytes]]:
576 """Parse a git-am-style patch and split it up into bits.
578 Args:
579 f: File-like object to parse
580 encoding: Encoding to use when creating Git objects
581 Returns: Tuple with commit object, diff contents and git version
582 """
583 encoding = encoding or getattr(f, "encoding", "ascii")
584 encoding = encoding or "ascii"
585 contents = f.read()
586 if isinstance(contents, bytes):
587 bparser = email.parser.BytesParser()
588 msg = bparser.parsebytes(contents)
589 else:
590 uparser = email.parser.Parser()
591 msg = uparser.parsestr(contents)
592 return parse_patch_message(msg, encoding)
595def parse_patch_message(
596 msg: "email.message.Message", encoding: Optional[str] = None
597) -> tuple["Commit", bytes, Optional[bytes]]:
598 """Extract a Commit object and patch from an e-mail message.
600 Args:
601 msg: An email message (email.message.Message)
602 encoding: Encoding to use to encode Git commits
603 Returns: Tuple with commit object, diff contents and git version
604 """
605 c = Commit()
606 if encoding is None:
607 encoding = "ascii"
608 c.author = msg["from"].encode(encoding)
609 c.committer = msg["from"].encode(encoding)
610 try:
611 patch_tag_start = msg["subject"].index("[PATCH")
612 except ValueError:
613 subject = msg["subject"]
614 else:
615 close = msg["subject"].index("] ", patch_tag_start)
616 subject = msg["subject"][close + 2 :]
617 c.message = (subject.replace("\n", "") + "\n").encode(encoding)
618 first = True
620 body = msg.get_payload(decode=True)
621 if isinstance(body, str):
622 body = body.encode(encoding)
623 if isinstance(body, bytes):
624 lines = body.splitlines(True)
625 else:
626 # Handle other types by converting to string first
627 lines = str(body).encode(encoding).splitlines(True)
628 line_iter = iter(lines)
630 for line in line_iter:
631 if line == b"---\n":
632 break
633 if first:
634 if line.startswith(b"From: "):
635 c.author = line[len(b"From: ") :].rstrip()
636 else:
637 c.message += b"\n" + line
638 first = False
639 else:
640 c.message += line
641 diff = b""
642 for line in line_iter:
643 if line == b"-- \n":
644 break
645 diff += line
646 try:
647 version = next(line_iter).rstrip(b"\n")
648 except StopIteration:
649 version = None
650 return c, diff, version