Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# patch.py -- For dealing with packed-style patches.
2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Classes for dealing with git am-style patches.
24These patches are basically unified diffs with some extra metadata tacked
25on.
26"""
28import email.parser
29import time
30from collections.abc import Generator
31from difflib import SequenceMatcher
32from typing import (
33 TYPE_CHECKING,
34 BinaryIO,
35 Optional,
36 TextIO,
37 Union,
38)
40if TYPE_CHECKING:
41 import email.message
43 from .object_store import BaseObjectStore
45from .objects import S_ISGITLINK, Blob, Commit
47FIRST_FEW_BYTES = 8000
50def write_commit_patch(
51 f: BinaryIO,
52 commit: "Commit",
53 contents: Union[str, bytes],
54 progress: tuple[int, int],
55 version: Optional[str] = None,
56 encoding: Optional[str] = None,
57) -> None:
58 """Write a individual file patch.
60 Args:
61 commit: Commit object
62 progress: tuple with current patch number and total.
64 Returns:
65 tuple with filename and contents
66 """
67 encoding = encoding or getattr(f, "encoding", "ascii")
68 if encoding is None:
69 encoding = "ascii"
70 if isinstance(contents, str):
71 contents = contents.encode(encoding)
72 (num, total) = progress
73 f.write(
74 b"From "
75 + commit.id
76 + b" "
77 + time.ctime(commit.commit_time).encode(encoding)
78 + b"\n"
79 )
80 f.write(b"From: " + commit.author + b"\n")
81 f.write(
82 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n"
83 )
84 f.write(
85 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n"
86 )
87 f.write(b"\n")
88 f.write(b"---\n")
89 try:
90 import subprocess
92 p = subprocess.Popen(
93 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE
94 )
95 except (ImportError, OSError):
96 pass # diffstat not available?
97 else:
98 (diffstat, _) = p.communicate(contents)
99 f.write(diffstat)
100 f.write(b"\n")
101 f.write(contents)
102 f.write(b"-- \n")
103 if version is None:
104 from dulwich import __version__ as dulwich_version
106 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version)
107 else:
108 if encoding is None:
109 encoding = "ascii"
110 f.write(version.encode(encoding) + b"\n")
113def get_summary(commit: "Commit") -> str:
114 """Determine the summary line for use in a filename.
116 Args:
117 commit: Commit
118 Returns: Summary string
119 """
120 decoded = commit.message.decode(errors="replace")
121 lines = decoded.splitlines()
122 return lines[0].replace(" ", "-") if lines else ""
125# Unified Diff
126def _format_range_unified(start: int, stop: int) -> str:
127 """Convert range to the "ed" format."""
128 # Per the diff spec at http://www.unix.org/single_unix_specification/
129 beginning = start + 1 # lines start numbering with one
130 length = stop - start
131 if length == 1:
132 return f"{beginning}"
133 if not length:
134 beginning -= 1 # empty ranges begin at line just before the range
135 return f"{beginning},{length}"
138def unified_diff(
139 a: list[bytes],
140 b: list[bytes],
141 fromfile: bytes = b"",
142 tofile: bytes = b"",
143 fromfiledate: str = "",
144 tofiledate: str = "",
145 n: int = 3,
146 lineterm: str = "\n",
147 tree_encoding: str = "utf-8",
148 output_encoding: str = "utf-8",
149) -> Generator[bytes, None, None]:
150 """difflib.unified_diff that can detect "No newline at end of file" as
151 original "git diff" does.
153 Based on the same function in Python2.7 difflib.py
154 """
155 started = False
156 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n):
157 if not started:
158 started = True
159 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
160 todate = f"\t{tofiledate}" if tofiledate else ""
161 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
162 output_encoding
163 )
164 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
165 output_encoding
166 )
168 first, last = group[0], group[-1]
169 file1_range = _format_range_unified(first[1], last[2])
170 file2_range = _format_range_unified(first[3], last[4])
171 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
173 for tag, i1, i2, j1, j2 in group:
174 if tag == "equal":
175 for line in a[i1:i2]:
176 yield b" " + line
177 continue
178 if tag in ("replace", "delete"):
179 for line in a[i1:i2]:
180 if not line[-1:] == b"\n":
181 line += b"\n\\ No newline at end of file\n"
182 yield b"-" + line
183 if tag in ("replace", "insert"):
184 for line in b[j1:j2]:
185 if not line[-1:] == b"\n":
186 line += b"\n\\ No newline at end of file\n"
187 yield b"+" + line
190def is_binary(content: bytes) -> bool:
191 """See if the first few bytes contain any null characters.
193 Args:
194 content: Bytestring to check for binary content
195 """
196 return b"\0" in content[:FIRST_FEW_BYTES]
199def shortid(hexsha: Optional[bytes]) -> bytes:
200 if hexsha is None:
201 return b"0" * 7
202 else:
203 return hexsha[:7]
206def patch_filename(p: Optional[bytes], root: bytes) -> bytes:
207 if p is None:
208 return b"/dev/null"
209 else:
210 return root + b"/" + p
213def write_object_diff(
214 f: BinaryIO,
215 store: "BaseObjectStore",
216 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
217 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
218 diff_binary: bool = False,
219) -> None:
220 """Write the diff for an object.
222 Args:
223 f: File-like object to write to
224 store: Store to retrieve objects from, if necessary
225 old_file: (path, mode, hexsha) tuple
226 new_file: (path, mode, hexsha) tuple
227 diff_binary: Whether to diff files even if they
228 are considered binary files by is_binary().
230 Note: the tuple elements should be None for nonexistent files
231 """
232 (old_path, old_mode, old_id) = old_file
233 (new_path, new_mode, new_id) = new_file
234 patched_old_path = patch_filename(old_path, b"a")
235 patched_new_path = patch_filename(new_path, b"b")
237 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob:
238 from typing import cast
240 if hexsha is None:
241 return cast(Blob, Blob.from_string(b""))
242 elif mode is not None and S_ISGITLINK(mode):
243 return cast(Blob, Blob.from_string(b"Subproject commit " + hexsha + b"\n"))
244 else:
245 obj = store[hexsha]
246 if isinstance(obj, Blob):
247 return obj
248 else:
249 # Fallback for non-blob objects
250 return cast(Blob, Blob.from_string(obj.as_raw_string()))
252 def lines(content: "Blob") -> list[bytes]:
253 if not content:
254 return []
255 else:
256 return content.splitlines()
258 f.writelines(
259 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id))
260 )
261 old_content = content(old_mode, old_id)
262 new_content = content(new_mode, new_id)
263 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)):
264 binary_diff = (
265 b"Binary files "
266 + patched_old_path
267 + b" and "
268 + patched_new_path
269 + b" differ\n"
270 )
271 f.write(binary_diff)
272 else:
273 f.writelines(
274 unified_diff(
275 lines(old_content),
276 lines(new_content),
277 patched_old_path,
278 patched_new_path,
279 )
280 )
283# TODO(jelmer): Support writing unicode, rather than bytes.
284def gen_diff_header(
285 paths: tuple[Optional[bytes], Optional[bytes]],
286 modes: tuple[Optional[int], Optional[int]],
287 shas: tuple[Optional[bytes], Optional[bytes]],
288) -> Generator[bytes, None, None]:
289 """Write a blob diff header.
291 Args:
292 paths: Tuple with old and new path
293 modes: Tuple with old and new modes
294 shas: Tuple with old and new shas
295 """
296 (old_path, new_path) = paths
297 (old_mode, new_mode) = modes
298 (old_sha, new_sha) = shas
299 if old_path is None and new_path is not None:
300 old_path = new_path
301 if new_path is None and old_path is not None:
302 new_path = old_path
303 old_path = patch_filename(old_path, b"a")
304 new_path = patch_filename(new_path, b"b")
305 yield b"diff --git " + old_path + b" " + new_path + b"\n"
307 if old_mode != new_mode:
308 if new_mode is not None:
309 if old_mode is not None:
310 yield (f"old file mode {old_mode:o}\n").encode("ascii")
311 yield (f"new file mode {new_mode:o}\n").encode("ascii")
312 else:
313 yield (f"deleted file mode {old_mode:o}\n").encode("ascii")
314 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha)
315 if new_mode is not None and old_mode is not None:
316 yield (f" {new_mode:o}").encode("ascii")
317 yield b"\n"
320# TODO(jelmer): Support writing unicode, rather than bytes.
321def write_blob_diff(
322 f: BinaryIO,
323 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
324 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
325) -> None:
326 """Write blob diff.
328 Args:
329 f: File-like object to write to
330 old_file: (path, mode, hexsha) tuple (None if nonexisting)
331 new_file: (path, mode, hexsha) tuple (None if nonexisting)
333 Note: The use of write_object_diff is recommended over this function.
334 """
335 (old_path, old_mode, old_blob) = old_file
336 (new_path, new_mode, new_blob) = new_file
337 patched_old_path = patch_filename(old_path, b"a")
338 patched_new_path = patch_filename(new_path, b"b")
340 def lines(blob: Optional["Blob"]) -> list[bytes]:
341 if blob is not None:
342 return blob.splitlines()
343 else:
344 return []
346 f.writelines(
347 gen_diff_header(
348 (old_path, new_path),
349 (old_mode, new_mode),
350 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)),
351 )
352 )
353 old_contents = lines(old_blob)
354 new_contents = lines(new_blob)
355 f.writelines(
356 unified_diff(old_contents, new_contents, patched_old_path, patched_new_path)
357 )
360def write_tree_diff(
361 f: BinaryIO,
362 store: "BaseObjectStore",
363 old_tree: Optional[bytes],
364 new_tree: Optional[bytes],
365 diff_binary: bool = False,
366) -> None:
367 """Write tree diff.
369 Args:
370 f: File-like object to write to.
371 old_tree: Old tree id
372 new_tree: New tree id
373 diff_binary: Whether to diff files even if they
374 are considered binary files by is_binary().
375 """
376 changes = store.tree_changes(old_tree, new_tree)
377 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
378 write_object_diff(
379 f,
380 store,
381 (oldpath, oldmode, oldsha),
382 (newpath, newmode, newsha),
383 diff_binary=diff_binary,
384 )
387def git_am_patch_split(
388 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None
389) -> tuple["Commit", bytes, Optional[bytes]]:
390 """Parse a git-am-style patch and split it up into bits.
392 Args:
393 f: File-like object to parse
394 encoding: Encoding to use when creating Git objects
395 Returns: Tuple with commit object, diff contents and git version
396 """
397 encoding = encoding or getattr(f, "encoding", "ascii")
398 encoding = encoding or "ascii"
399 contents = f.read()
400 if isinstance(contents, bytes):
401 bparser = email.parser.BytesParser()
402 msg = bparser.parsebytes(contents)
403 else:
404 uparser = email.parser.Parser()
405 msg = uparser.parsestr(contents)
406 return parse_patch_message(msg, encoding)
409def parse_patch_message(
410 msg: "email.message.Message", encoding: Optional[str] = None
411) -> tuple["Commit", bytes, Optional[bytes]]:
412 """Extract a Commit object and patch from an e-mail message.
414 Args:
415 msg: An email message (email.message.Message)
416 encoding: Encoding to use to encode Git commits
417 Returns: Tuple with commit object, diff contents and git version
418 """
419 c = Commit()
420 if encoding is None:
421 encoding = "ascii"
422 c.author = msg["from"].encode(encoding)
423 c.committer = msg["from"].encode(encoding)
424 try:
425 patch_tag_start = msg["subject"].index("[PATCH")
426 except ValueError:
427 subject = msg["subject"]
428 else:
429 close = msg["subject"].index("] ", patch_tag_start)
430 subject = msg["subject"][close + 2 :]
431 c.message = (subject.replace("\n", "") + "\n").encode(encoding)
432 first = True
434 body = msg.get_payload(decode=True)
435 if isinstance(body, str):
436 body = body.encode(encoding)
437 if isinstance(body, bytes):
438 lines = body.splitlines(True)
439 else:
440 # Handle other types by converting to string first
441 lines = str(body).encode(encoding).splitlines(True)
442 line_iter = iter(lines)
444 for line in line_iter:
445 if line == b"---\n":
446 break
447 if first:
448 if line.startswith(b"From: "):
449 c.author = line[len(b"From: ") :].rstrip()
450 else:
451 c.message += b"\n" + line
452 first = False
453 else:
454 c.message += line
455 diff = b""
456 for line in line_iter:
457 if line == b"-- \n":
458 break
459 diff += line
460 try:
461 version = next(line_iter).rstrip(b"\n")
462 except StopIteration:
463 version = None
464 return c, diff, version