Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# patch.py -- For dealing with packed-style patches.
2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as public by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Classes for dealing with git am-style patches.
24These patches are basically unified diffs with some extra metadata tacked
25on.
26"""
28import email.parser
29import time
30from collections.abc import Generator
31from difflib import SequenceMatcher
32from typing import (
33 TYPE_CHECKING,
34 BinaryIO,
35 Optional,
36 TextIO,
37 Union,
38)
40if TYPE_CHECKING:
41 import email.message
43 from .object_store import BaseObjectStore
45from .objects import S_ISGITLINK, Blob, Commit
47FIRST_FEW_BYTES = 8000
50def write_commit_patch(
51 f: BinaryIO,
52 commit: "Commit",
53 contents: Union[str, bytes],
54 progress: tuple[int, int],
55 version: Optional[str] = None,
56 encoding: Optional[str] = None,
57) -> None:
58 """Write a individual file patch.
60 Args:
61 commit: Commit object
62 progress: tuple with current patch number and total.
64 Returns:
65 tuple with filename and contents
66 """
67 encoding = encoding or getattr(f, "encoding", "ascii")
68 if encoding is None:
69 encoding = "ascii"
70 if isinstance(contents, str):
71 contents = contents.encode(encoding)
72 (num, total) = progress
73 f.write(
74 b"From "
75 + commit.id
76 + b" "
77 + time.ctime(commit.commit_time).encode(encoding)
78 + b"\n"
79 )
80 f.write(b"From: " + commit.author + b"\n")
81 f.write(
82 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n"
83 )
84 f.write(
85 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n"
86 )
87 f.write(b"\n")
88 f.write(b"---\n")
89 try:
90 import subprocess
92 p = subprocess.Popen(
93 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE
94 )
95 except (ImportError, OSError):
96 pass # diffstat not available?
97 else:
98 (diffstat, _) = p.communicate(contents)
99 f.write(diffstat)
100 f.write(b"\n")
101 f.write(contents)
102 f.write(b"-- \n")
103 if version is None:
104 from dulwich import __version__ as dulwich_version
106 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version)
107 else:
108 if encoding is None:
109 encoding = "ascii"
110 f.write(version.encode(encoding) + b"\n")
113def get_summary(commit: "Commit") -> str:
114 """Determine the summary line for use in a filename.
116 Args:
117 commit: Commit
118 Returns: Summary string
119 """
120 decoded = commit.message.decode(errors="replace")
121 return decoded.splitlines()[0].replace(" ", "-")
124# Unified Diff
125def _format_range_unified(start: int, stop: int) -> str:
126 """Convert range to the "ed" format."""
127 # Per the diff spec at http://www.unix.org/single_unix_specification/
128 beginning = start + 1 # lines start numbering with one
129 length = stop - start
130 if length == 1:
131 return f"{beginning}"
132 if not length:
133 beginning -= 1 # empty ranges begin at line just before the range
134 return f"{beginning},{length}"
137def unified_diff(
138 a: list[bytes],
139 b: list[bytes],
140 fromfile: bytes = b"",
141 tofile: bytes = b"",
142 fromfiledate: str = "",
143 tofiledate: str = "",
144 n: int = 3,
145 lineterm: str = "\n",
146 tree_encoding: str = "utf-8",
147 output_encoding: str = "utf-8",
148) -> Generator[bytes, None, None]:
149 """difflib.unified_diff that can detect "No newline at end of file" as
150 original "git diff" does.
152 Based on the same function in Python2.7 difflib.py
153 """
154 started = False
155 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n):
156 if not started:
157 started = True
158 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
159 todate = f"\t{tofiledate}" if tofiledate else ""
160 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
161 output_encoding
162 )
163 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
164 output_encoding
165 )
167 first, last = group[0], group[-1]
168 file1_range = _format_range_unified(first[1], last[2])
169 file2_range = _format_range_unified(first[3], last[4])
170 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
172 for tag, i1, i2, j1, j2 in group:
173 if tag == "equal":
174 for line in a[i1:i2]:
175 yield b" " + line
176 continue
177 if tag in ("replace", "delete"):
178 for line in a[i1:i2]:
179 if not line[-1:] == b"\n":
180 line += b"\n\\ No newline at end of file\n"
181 yield b"-" + line
182 if tag in ("replace", "insert"):
183 for line in b[j1:j2]:
184 if not line[-1:] == b"\n":
185 line += b"\n\\ No newline at end of file\n"
186 yield b"+" + line
189def is_binary(content: bytes) -> bool:
190 """See if the first few bytes contain any null characters.
192 Args:
193 content: Bytestring to check for binary content
194 """
195 return b"\0" in content[:FIRST_FEW_BYTES]
198def shortid(hexsha: Optional[bytes]) -> bytes:
199 if hexsha is None:
200 return b"0" * 7
201 else:
202 return hexsha[:7]
205def patch_filename(p: Optional[bytes], root: bytes) -> bytes:
206 if p is None:
207 return b"/dev/null"
208 else:
209 return root + b"/" + p
212def write_object_diff(
213 f: BinaryIO,
214 store: "BaseObjectStore",
215 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
216 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
217 diff_binary: bool = False,
218) -> None:
219 """Write the diff for an object.
221 Args:
222 f: File-like object to write to
223 store: Store to retrieve objects from, if necessary
224 old_file: (path, mode, hexsha) tuple
225 new_file: (path, mode, hexsha) tuple
226 diff_binary: Whether to diff files even if they
227 are considered binary files by is_binary().
229 Note: the tuple elements should be None for nonexistent files
230 """
231 (old_path, old_mode, old_id) = old_file
232 (new_path, new_mode, new_id) = new_file
233 patched_old_path = patch_filename(old_path, b"a")
234 patched_new_path = patch_filename(new_path, b"b")
236 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob:
237 from typing import cast
239 if hexsha is None:
240 return cast(Blob, Blob.from_string(b""))
241 elif mode is not None and S_ISGITLINK(mode):
242 return cast(Blob, Blob.from_string(b"Subproject commit " + hexsha + b"\n"))
243 else:
244 obj = store[hexsha]
245 if isinstance(obj, Blob):
246 return obj
247 else:
248 # Fallback for non-blob objects
249 return cast(Blob, Blob.from_string(obj.as_raw_string()))
251 def lines(content: "Blob") -> list[bytes]:
252 if not content:
253 return []
254 else:
255 return content.splitlines()
257 f.writelines(
258 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id))
259 )
260 old_content = content(old_mode, old_id)
261 new_content = content(new_mode, new_id)
262 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)):
263 binary_diff = (
264 b"Binary files "
265 + patched_old_path
266 + b" and "
267 + patched_new_path
268 + b" differ\n"
269 )
270 f.write(binary_diff)
271 else:
272 f.writelines(
273 unified_diff(
274 lines(old_content),
275 lines(new_content),
276 patched_old_path,
277 patched_new_path,
278 )
279 )
282# TODO(jelmer): Support writing unicode, rather than bytes.
283def gen_diff_header(
284 paths: tuple[Optional[bytes], Optional[bytes]],
285 modes: tuple[Optional[int], Optional[int]],
286 shas: tuple[Optional[bytes], Optional[bytes]],
287) -> Generator[bytes, None, None]:
288 """Write a blob diff header.
290 Args:
291 paths: Tuple with old and new path
292 modes: Tuple with old and new modes
293 shas: Tuple with old and new shas
294 """
295 (old_path, new_path) = paths
296 (old_mode, new_mode) = modes
297 (old_sha, new_sha) = shas
298 if old_path is None and new_path is not None:
299 old_path = new_path
300 if new_path is None and old_path is not None:
301 new_path = old_path
302 old_path = patch_filename(old_path, b"a")
303 new_path = patch_filename(new_path, b"b")
304 yield b"diff --git " + old_path + b" " + new_path + b"\n"
306 if old_mode != new_mode:
307 if new_mode is not None:
308 if old_mode is not None:
309 yield (f"old file mode {old_mode:o}\n").encode("ascii")
310 yield (f"new file mode {new_mode:o}\n").encode("ascii")
311 else:
312 yield (f"deleted file mode {old_mode:o}\n").encode("ascii")
313 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha)
314 if new_mode is not None and old_mode is not None:
315 yield (f" {new_mode:o}").encode("ascii")
316 yield b"\n"
319# TODO(jelmer): Support writing unicode, rather than bytes.
320def write_blob_diff(
321 f: BinaryIO,
322 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
323 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
324) -> None:
325 """Write blob diff.
327 Args:
328 f: File-like object to write to
329 old_file: (path, mode, hexsha) tuple (None if nonexisting)
330 new_file: (path, mode, hexsha) tuple (None if nonexisting)
332 Note: The use of write_object_diff is recommended over this function.
333 """
334 (old_path, old_mode, old_blob) = old_file
335 (new_path, new_mode, new_blob) = new_file
336 patched_old_path = patch_filename(old_path, b"a")
337 patched_new_path = patch_filename(new_path, b"b")
339 def lines(blob: Optional["Blob"]) -> list[bytes]:
340 if blob is not None:
341 return blob.splitlines()
342 else:
343 return []
345 f.writelines(
346 gen_diff_header(
347 (old_path, new_path),
348 (old_mode, new_mode),
349 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)),
350 )
351 )
352 old_contents = lines(old_blob)
353 new_contents = lines(new_blob)
354 f.writelines(
355 unified_diff(old_contents, new_contents, patched_old_path, patched_new_path)
356 )
359def write_tree_diff(
360 f: BinaryIO,
361 store: "BaseObjectStore",
362 old_tree: Optional[bytes],
363 new_tree: Optional[bytes],
364 diff_binary: bool = False,
365) -> None:
366 """Write tree diff.
368 Args:
369 f: File-like object to write to.
370 old_tree: Old tree id
371 new_tree: New tree id
372 diff_binary: Whether to diff files even if they
373 are considered binary files by is_binary().
374 """
375 changes = store.tree_changes(old_tree, new_tree)
376 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
377 write_object_diff(
378 f,
379 store,
380 (oldpath, oldmode, oldsha),
381 (newpath, newmode, newsha),
382 diff_binary=diff_binary,
383 )
386def git_am_patch_split(
387 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None
388) -> tuple["Commit", bytes, Optional[bytes]]:
389 """Parse a git-am-style patch and split it up into bits.
391 Args:
392 f: File-like object to parse
393 encoding: Encoding to use when creating Git objects
394 Returns: Tuple with commit object, diff contents and git version
395 """
396 encoding = encoding or getattr(f, "encoding", "ascii")
397 encoding = encoding or "ascii"
398 contents = f.read()
399 if isinstance(contents, bytes):
400 bparser = email.parser.BytesParser()
401 msg = bparser.parsebytes(contents)
402 else:
403 uparser = email.parser.Parser()
404 msg = uparser.parsestr(contents)
405 return parse_patch_message(msg, encoding)
408def parse_patch_message(
409 msg: "email.message.Message", encoding: Optional[str] = None
410) -> tuple["Commit", bytes, Optional[bytes]]:
411 """Extract a Commit object and patch from an e-mail message.
413 Args:
414 msg: An email message (email.message.Message)
415 encoding: Encoding to use to encode Git commits
416 Returns: Tuple with commit object, diff contents and git version
417 """
418 c = Commit()
419 if encoding is None:
420 encoding = "ascii"
421 c.author = msg["from"].encode(encoding)
422 c.committer = msg["from"].encode(encoding)
423 try:
424 patch_tag_start = msg["subject"].index("[PATCH")
425 except ValueError:
426 subject = msg["subject"]
427 else:
428 close = msg["subject"].index("] ", patch_tag_start)
429 subject = msg["subject"][close + 2 :]
430 c.message = (subject.replace("\n", "") + "\n").encode(encoding)
431 first = True
433 body = msg.get_payload(decode=True)
434 if isinstance(body, str):
435 body = body.encode(encoding)
436 if isinstance(body, bytes):
437 lines = body.splitlines(True)
438 else:
439 # Handle other types by converting to string first
440 lines = str(body).encode(encoding).splitlines(True)
441 line_iter = iter(lines)
443 for line in line_iter:
444 if line == b"---\n":
445 break
446 if first:
447 if line.startswith(b"From: "):
448 c.author = line[len(b"From: ") :].rstrip()
449 else:
450 c.message += b"\n" + line
451 first = False
452 else:
453 c.message += line
454 diff = b""
455 for line in line_iter:
456 if line == b"-- \n":
457 break
458 diff += line
459 try:
460 version = next(line_iter).rstrip(b"\n")
461 except StopIteration:
462 version = None
463 return c, diff, version