Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# patch.py -- For dealing with packed-style patches.
2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Classes for dealing with git am-style patches.
24These patches are basically unified diffs with some extra metadata tacked
25on.
26"""
28import email.parser
29import time
30from collections.abc import Generator
31from difflib import SequenceMatcher
32from typing import (
33 IO,
34 TYPE_CHECKING,
35 BinaryIO,
36 Optional,
37 TextIO,
38 Union,
39)
41if TYPE_CHECKING:
42 import email.message
44 from .object_store import BaseObjectStore
46from .objects import S_ISGITLINK, Blob, Commit
48FIRST_FEW_BYTES = 8000
50DEFAULT_DIFF_ALGORITHM = "myers"
53class DiffAlgorithmNotAvailable(Exception):
54 """Raised when a requested diff algorithm is not available."""
56 def __init__(self, algorithm: str, install_hint: str = "") -> None:
57 """Initialize exception.
59 Args:
60 algorithm: Name of the unavailable algorithm
61 install_hint: Optional installation hint
62 """
63 self.algorithm = algorithm
64 self.install_hint = install_hint
65 if install_hint:
66 super().__init__(
67 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}"
68 )
69 else:
70 super().__init__(
71 f"Diff algorithm '{algorithm}' requested but not available."
72 )
75def write_commit_patch(
76 f: IO[bytes],
77 commit: "Commit",
78 contents: Union[str, bytes],
79 progress: tuple[int, int],
80 version: Optional[str] = None,
81 encoding: Optional[str] = None,
82) -> None:
83 """Write a individual file patch.
85 Args:
86 f: File-like object to write to
87 commit: Commit object
88 contents: Contents of the patch
89 progress: tuple with current patch number and total.
90 version: Version string to include in patch header
91 encoding: Encoding to use for the patch
93 Returns:
94 tuple with filename and contents
95 """
96 encoding = encoding or getattr(f, "encoding", "ascii")
97 if encoding is None:
98 encoding = "ascii"
99 if isinstance(contents, str):
100 contents = contents.encode(encoding)
101 (num, total) = progress
102 f.write(
103 b"From "
104 + commit.id
105 + b" "
106 + time.ctime(commit.commit_time).encode(encoding)
107 + b"\n"
108 )
109 f.write(b"From: " + commit.author + b"\n")
110 f.write(
111 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n"
112 )
113 f.write(
114 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n"
115 )
116 f.write(b"\n")
117 f.write(b"---\n")
118 try:
119 import subprocess
121 p = subprocess.Popen(
122 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE
123 )
124 except (ImportError, OSError):
125 pass # diffstat not available?
126 else:
127 (diffstat, _) = p.communicate(contents)
128 f.write(diffstat)
129 f.write(b"\n")
130 f.write(contents)
131 f.write(b"-- \n")
132 if version is None:
133 from dulwich import __version__ as dulwich_version
135 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version)
136 else:
137 if encoding is None:
138 encoding = "ascii"
139 f.write(version.encode(encoding) + b"\n")
142def get_summary(commit: "Commit") -> str:
143 """Determine the summary line for use in a filename.
145 Args:
146 commit: Commit
147 Returns: Summary string
148 """
149 decoded = commit.message.decode(errors="replace")
150 lines = decoded.splitlines()
151 return lines[0].replace(" ", "-") if lines else ""
154# Unified Diff
155def _format_range_unified(start: int, stop: int) -> str:
156 """Convert range to the "ed" format."""
157 # Per the diff spec at http://www.unix.org/single_unix_specification/
158 beginning = start + 1 # lines start numbering with one
159 length = stop - start
160 if length == 1:
161 return f"{beginning}"
162 if not length:
163 beginning -= 1 # empty ranges begin at line just before the range
164 return f"{beginning},{length}"
167def unified_diff(
168 a: list[bytes],
169 b: list[bytes],
170 fromfile: bytes = b"",
171 tofile: bytes = b"",
172 fromfiledate: str = "",
173 tofiledate: str = "",
174 n: int = 3,
175 lineterm: str = "\n",
176 tree_encoding: str = "utf-8",
177 output_encoding: str = "utf-8",
178) -> Generator[bytes, None, None]:
179 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does.
181 Based on the same function in Python2.7 difflib.py
182 """
183 started = False
184 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n):
185 if not started:
186 started = True
187 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
188 todate = f"\t{tofiledate}" if tofiledate else ""
189 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
190 output_encoding
191 )
192 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
193 output_encoding
194 )
196 first, last = group[0], group[-1]
197 file1_range = _format_range_unified(first[1], last[2])
198 file2_range = _format_range_unified(first[3], last[4])
199 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
201 for tag, i1, i2, j1, j2 in group:
202 if tag == "equal":
203 for line in a[i1:i2]:
204 yield b" " + line
205 continue
206 if tag in ("replace", "delete"):
207 for line in a[i1:i2]:
208 if not line[-1:] == b"\n":
209 line += b"\n\\ No newline at end of file\n"
210 yield b"-" + line
211 if tag in ("replace", "insert"):
212 for line in b[j1:j2]:
213 if not line[-1:] == b"\n":
214 line += b"\n\\ No newline at end of file\n"
215 yield b"+" + line
218def _get_sequence_matcher(algorithm: str, a: list[bytes], b: list[bytes]):
219 """Get appropriate sequence matcher for the given algorithm.
221 Args:
222 algorithm: Diff algorithm ("myers" or "patience")
223 a: First sequence
224 b: Second sequence
226 Returns:
227 Configured sequence matcher instance
229 Raises:
230 DiffAlgorithmNotAvailable: If patience requested but not available
231 """
232 if algorithm == "patience":
233 try:
234 from patiencediff import PatienceSequenceMatcher
236 return PatienceSequenceMatcher(None, a, b)
237 except ImportError:
238 raise DiffAlgorithmNotAvailable(
239 "patience", "Install with: pip install 'dulwich[patiencediff]'"
240 )
241 else:
242 return SequenceMatcher(a=a, b=b)
245def unified_diff_with_algorithm(
246 a: list[bytes],
247 b: list[bytes],
248 fromfile: bytes = b"",
249 tofile: bytes = b"",
250 fromfiledate: str = "",
251 tofiledate: str = "",
252 n: int = 3,
253 lineterm: str = "\n",
254 tree_encoding: str = "utf-8",
255 output_encoding: str = "utf-8",
256 algorithm: Optional[str] = None,
257) -> Generator[bytes, None, None]:
258 """Generate unified diff with specified algorithm.
260 Args:
261 a: First sequence of lines
262 b: Second sequence of lines
263 fromfile: Name of first file
264 tofile: Name of second file
265 fromfiledate: Date of first file
266 tofiledate: Date of second file
267 n: Number of context lines
268 lineterm: Line terminator
269 tree_encoding: Encoding for tree paths
270 output_encoding: Encoding for output
271 algorithm: Diff algorithm to use ("myers" or "patience")
273 Returns:
274 Generator yielding diff lines
276 Raises:
277 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available
278 """
279 if algorithm is None:
280 algorithm = DEFAULT_DIFF_ALGORITHM
282 matcher = _get_sequence_matcher(algorithm, a, b)
284 started = False
285 for group in matcher.get_grouped_opcodes(n):
286 if not started:
287 started = True
288 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
289 todate = f"\t{tofiledate}" if tofiledate else ""
290 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
291 output_encoding
292 )
293 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
294 output_encoding
295 )
297 first, last = group[0], group[-1]
298 file1_range = _format_range_unified(first[1], last[2])
299 file2_range = _format_range_unified(first[3], last[4])
300 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
302 for tag, i1, i2, j1, j2 in group:
303 if tag == "equal":
304 for line in a[i1:i2]:
305 yield b" " + line
306 continue
307 if tag in ("replace", "delete"):
308 for line in a[i1:i2]:
309 if not line[-1:] == b"\n":
310 line += b"\n\\ No newline at end of file\n"
311 yield b"-" + line
312 if tag in ("replace", "insert"):
313 for line in b[j1:j2]:
314 if not line[-1:] == b"\n":
315 line += b"\n\\ No newline at end of file\n"
316 yield b"+" + line
319def is_binary(content: bytes) -> bool:
320 """See if the first few bytes contain any null characters.
322 Args:
323 content: Bytestring to check for binary content
324 """
325 return b"\0" in content[:FIRST_FEW_BYTES]
328def shortid(hexsha: Optional[bytes]) -> bytes:
329 """Get short object ID.
331 Args:
332 hexsha: Full hex SHA or None
334 Returns:
335 7-character short ID
336 """
337 if hexsha is None:
338 return b"0" * 7
339 else:
340 return hexsha[:7]
343def patch_filename(p: Optional[bytes], root: bytes) -> bytes:
344 """Generate patch filename.
346 Args:
347 p: Path or None
348 root: Root directory
350 Returns:
351 Full patch filename
352 """
353 if p is None:
354 return b"/dev/null"
355 else:
356 return root + b"/" + p
359def write_object_diff(
360 f: IO[bytes],
361 store: "BaseObjectStore",
362 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
363 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
364 diff_binary: bool = False,
365 diff_algorithm: Optional[str] = None,
366) -> None:
367 """Write the diff for an object.
369 Args:
370 f: File-like object to write to
371 store: Store to retrieve objects from, if necessary
372 old_file: (path, mode, hexsha) tuple
373 new_file: (path, mode, hexsha) tuple
374 diff_binary: Whether to diff files even if they
375 are considered binary files by is_binary().
376 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
378 Note: the tuple elements should be None for nonexistent files
379 """
380 (old_path, old_mode, old_id) = old_file
381 (new_path, new_mode, new_id) = new_file
382 patched_old_path = patch_filename(old_path, b"a")
383 patched_new_path = patch_filename(new_path, b"b")
385 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob:
386 """Get blob content for a file.
388 Args:
389 mode: File mode
390 hexsha: Object SHA
392 Returns:
393 Blob object
394 """
395 if hexsha is None:
396 return Blob.from_string(b"")
397 elif mode is not None and S_ISGITLINK(mode):
398 return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
399 else:
400 obj = store[hexsha]
401 if isinstance(obj, Blob):
402 return obj
403 else:
404 # Fallback for non-blob objects
405 return Blob.from_string(obj.as_raw_string())
407 def lines(content: "Blob") -> list[bytes]:
408 """Split blob content into lines.
410 Args:
411 content: Blob content
413 Returns:
414 List of lines
415 """
416 if not content:
417 return []
418 else:
419 return content.splitlines()
421 f.writelines(
422 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id))
423 )
424 old_content = content(old_mode, old_id)
425 new_content = content(new_mode, new_id)
426 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)):
427 binary_diff = (
428 b"Binary files "
429 + patched_old_path
430 + b" and "
431 + patched_new_path
432 + b" differ\n"
433 )
434 f.write(binary_diff)
435 else:
436 f.writelines(
437 unified_diff_with_algorithm(
438 lines(old_content),
439 lines(new_content),
440 patched_old_path,
441 patched_new_path,
442 algorithm=diff_algorithm,
443 )
444 )
447# TODO(jelmer): Support writing unicode, rather than bytes.
448def gen_diff_header(
449 paths: tuple[Optional[bytes], Optional[bytes]],
450 modes: tuple[Optional[int], Optional[int]],
451 shas: tuple[Optional[bytes], Optional[bytes]],
452) -> Generator[bytes, None, None]:
453 """Write a blob diff header.
455 Args:
456 paths: Tuple with old and new path
457 modes: Tuple with old and new modes
458 shas: Tuple with old and new shas
459 """
460 (old_path, new_path) = paths
461 (old_mode, new_mode) = modes
462 (old_sha, new_sha) = shas
463 if old_path is None and new_path is not None:
464 old_path = new_path
465 if new_path is None and old_path is not None:
466 new_path = old_path
467 old_path = patch_filename(old_path, b"a")
468 new_path = patch_filename(new_path, b"b")
469 yield b"diff --git " + old_path + b" " + new_path + b"\n"
471 if old_mode != new_mode:
472 if new_mode is not None:
473 if old_mode is not None:
474 yield (f"old file mode {old_mode:o}\n").encode("ascii")
475 yield (f"new file mode {new_mode:o}\n").encode("ascii")
476 else:
477 yield (f"deleted file mode {old_mode:o}\n").encode("ascii")
478 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha)
479 if new_mode is not None and old_mode is not None:
480 yield (f" {new_mode:o}").encode("ascii")
481 yield b"\n"
484# TODO(jelmer): Support writing unicode, rather than bytes.
485def write_blob_diff(
486 f: IO[bytes],
487 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
488 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
489 diff_algorithm: Optional[str] = None,
490) -> None:
491 """Write blob diff.
493 Args:
494 f: File-like object to write to
495 old_file: (path, mode, hexsha) tuple (None if nonexisting)
496 new_file: (path, mode, hexsha) tuple (None if nonexisting)
497 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
499 Note: The use of write_object_diff is recommended over this function.
500 """
501 (old_path, old_mode, old_blob) = old_file
502 (new_path, new_mode, new_blob) = new_file
503 patched_old_path = patch_filename(old_path, b"a")
504 patched_new_path = patch_filename(new_path, b"b")
506 def lines(blob: Optional["Blob"]) -> list[bytes]:
507 """Split blob content into lines.
509 Args:
510 blob: Blob object or None
512 Returns:
513 List of lines
514 """
515 if blob is not None:
516 return blob.splitlines()
517 else:
518 return []
520 f.writelines(
521 gen_diff_header(
522 (old_path, new_path),
523 (old_mode, new_mode),
524 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)),
525 )
526 )
527 old_contents = lines(old_blob)
528 new_contents = lines(new_blob)
529 f.writelines(
530 unified_diff_with_algorithm(
531 old_contents,
532 new_contents,
533 patched_old_path,
534 patched_new_path,
535 algorithm=diff_algorithm,
536 )
537 )
540def write_tree_diff(
541 f: IO[bytes],
542 store: "BaseObjectStore",
543 old_tree: Optional[bytes],
544 new_tree: Optional[bytes],
545 diff_binary: bool = False,
546 diff_algorithm: Optional[str] = None,
547) -> None:
548 """Write tree diff.
550 Args:
551 f: File-like object to write to.
552 store: Object store to read from
553 old_tree: Old tree id
554 new_tree: New tree id
555 diff_binary: Whether to diff files even if they
556 are considered binary files by is_binary().
557 diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
558 """
559 changes = store.tree_changes(old_tree, new_tree)
560 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
561 write_object_diff(
562 f,
563 store,
564 (oldpath, oldmode, oldsha),
565 (newpath, newmode, newsha),
566 diff_binary=diff_binary,
567 diff_algorithm=diff_algorithm,
568 )
571def git_am_patch_split(
572 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None
573) -> tuple["Commit", bytes, Optional[bytes]]:
574 """Parse a git-am-style patch and split it up into bits.
576 Args:
577 f: File-like object to parse
578 encoding: Encoding to use when creating Git objects
579 Returns: Tuple with commit object, diff contents and git version
580 """
581 encoding = encoding or getattr(f, "encoding", "ascii")
582 encoding = encoding or "ascii"
583 contents = f.read()
584 if isinstance(contents, bytes):
585 bparser = email.parser.BytesParser()
586 msg = bparser.parsebytes(contents)
587 else:
588 uparser = email.parser.Parser()
589 msg = uparser.parsestr(contents)
590 return parse_patch_message(msg, encoding)
593def parse_patch_message(
594 msg: "email.message.Message", encoding: Optional[str] = None
595) -> tuple["Commit", bytes, Optional[bytes]]:
596 """Extract a Commit object and patch from an e-mail message.
598 Args:
599 msg: An email message (email.message.Message)
600 encoding: Encoding to use to encode Git commits
601 Returns: Tuple with commit object, diff contents and git version
602 """
603 c = Commit()
604 if encoding is None:
605 encoding = "ascii"
606 c.author = msg["from"].encode(encoding)
607 c.committer = msg["from"].encode(encoding)
608 try:
609 patch_tag_start = msg["subject"].index("[PATCH")
610 except ValueError:
611 subject = msg["subject"]
612 else:
613 close = msg["subject"].index("] ", patch_tag_start)
614 subject = msg["subject"][close + 2 :]
615 c.message = (subject.replace("\n", "") + "\n").encode(encoding)
616 first = True
618 body = msg.get_payload(decode=True)
619 if isinstance(body, str):
620 body = body.encode(encoding)
621 if isinstance(body, bytes):
622 lines = body.splitlines(True)
623 else:
624 # Handle other types by converting to string first
625 lines = str(body).encode(encoding).splitlines(True)
626 line_iter = iter(lines)
628 for line in line_iter:
629 if line == b"---\n":
630 break
631 if first:
632 if line.startswith(b"From: "):
633 c.author = line[len(b"From: ") :].rstrip()
634 else:
635 c.message += b"\n" + line
636 first = False
637 else:
638 c.message += line
639 diff = b""
640 for line in line_iter:
641 if line == b"-- \n":
642 break
643 diff += line
644 try:
645 version = next(line_iter).rstrip(b"\n")
646 except StopIteration:
647 version = None
648 return c, diff, version