Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# patch.py -- For dealing with packed-style patches.
2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk>
3#
4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
6# General Public License as published by the Free Software Foundation; version 2.0
7# or (at your option) any later version. You can redistribute it and/or
8# modify it under the terms of either of these two licenses.
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16# You should have received a copy of the licenses; if not, see
17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
19# License, Version 2.0.
20#
22"""Classes for dealing with git am-style patches.
24These patches are basically unified diffs with some extra metadata tacked
25on.
26"""
28import email.parser
29import time
30from collections.abc import Generator
31from difflib import SequenceMatcher
32from typing import (
33 IO,
34 TYPE_CHECKING,
35 BinaryIO,
36 Optional,
37 TextIO,
38 Union,
39)
41if TYPE_CHECKING:
42 import email.message
44 from .object_store import BaseObjectStore
46from .objects import S_ISGITLINK, Blob, Commit
48FIRST_FEW_BYTES = 8000
51def write_commit_patch(
52 f: IO[bytes],
53 commit: "Commit",
54 contents: Union[str, bytes],
55 progress: tuple[int, int],
56 version: Optional[str] = None,
57 encoding: Optional[str] = None,
58) -> None:
59 """Write a individual file patch.
61 Args:
62 f: File-like object to write to
63 commit: Commit object
64 contents: Contents of the patch
65 progress: tuple with current patch number and total.
66 version: Version string to include in patch header
67 encoding: Encoding to use for the patch
69 Returns:
70 tuple with filename and contents
71 """
72 encoding = encoding or getattr(f, "encoding", "ascii")
73 if encoding is None:
74 encoding = "ascii"
75 if isinstance(contents, str):
76 contents = contents.encode(encoding)
77 (num, total) = progress
78 f.write(
79 b"From "
80 + commit.id
81 + b" "
82 + time.ctime(commit.commit_time).encode(encoding)
83 + b"\n"
84 )
85 f.write(b"From: " + commit.author + b"\n")
86 f.write(
87 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n"
88 )
89 f.write(
90 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n"
91 )
92 f.write(b"\n")
93 f.write(b"---\n")
94 try:
95 import subprocess
97 p = subprocess.Popen(
98 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE
99 )
100 except (ImportError, OSError):
101 pass # diffstat not available?
102 else:
103 (diffstat, _) = p.communicate(contents)
104 f.write(diffstat)
105 f.write(b"\n")
106 f.write(contents)
107 f.write(b"-- \n")
108 if version is None:
109 from dulwich import __version__ as dulwich_version
111 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version)
112 else:
113 if encoding is None:
114 encoding = "ascii"
115 f.write(version.encode(encoding) + b"\n")
118def get_summary(commit: "Commit") -> str:
119 """Determine the summary line for use in a filename.
121 Args:
122 commit: Commit
123 Returns: Summary string
124 """
125 decoded = commit.message.decode(errors="replace")
126 lines = decoded.splitlines()
127 return lines[0].replace(" ", "-") if lines else ""
130# Unified Diff
131def _format_range_unified(start: int, stop: int) -> str:
132 """Convert range to the "ed" format."""
133 # Per the diff spec at http://www.unix.org/single_unix_specification/
134 beginning = start + 1 # lines start numbering with one
135 length = stop - start
136 if length == 1:
137 return f"{beginning}"
138 if not length:
139 beginning -= 1 # empty ranges begin at line just before the range
140 return f"{beginning},{length}"
143def unified_diff(
144 a: list[bytes],
145 b: list[bytes],
146 fromfile: bytes = b"",
147 tofile: bytes = b"",
148 fromfiledate: str = "",
149 tofiledate: str = "",
150 n: int = 3,
151 lineterm: str = "\n",
152 tree_encoding: str = "utf-8",
153 output_encoding: str = "utf-8",
154) -> Generator[bytes, None, None]:
155 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does.
157 Based on the same function in Python2.7 difflib.py
158 """
159 started = False
160 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n):
161 if not started:
162 started = True
163 fromdate = f"\t{fromfiledate}" if fromfiledate else ""
164 todate = f"\t{tofiledate}" if tofiledate else ""
165 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
166 output_encoding
167 )
168 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
169 output_encoding
170 )
172 first, last = group[0], group[-1]
173 file1_range = _format_range_unified(first[1], last[2])
174 file2_range = _format_range_unified(first[3], last[4])
175 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
177 for tag, i1, i2, j1, j2 in group:
178 if tag == "equal":
179 for line in a[i1:i2]:
180 yield b" " + line
181 continue
182 if tag in ("replace", "delete"):
183 for line in a[i1:i2]:
184 if not line[-1:] == b"\n":
185 line += b"\n\\ No newline at end of file\n"
186 yield b"-" + line
187 if tag in ("replace", "insert"):
188 for line in b[j1:j2]:
189 if not line[-1:] == b"\n":
190 line += b"\n\\ No newline at end of file\n"
191 yield b"+" + line
194def is_binary(content: bytes) -> bool:
195 """See if the first few bytes contain any null characters.
197 Args:
198 content: Bytestring to check for binary content
199 """
200 return b"\0" in content[:FIRST_FEW_BYTES]
203def shortid(hexsha: Optional[bytes]) -> bytes:
204 """Get short object ID.
206 Args:
207 hexsha: Full hex SHA or None
209 Returns:
210 7-character short ID
211 """
212 if hexsha is None:
213 return b"0" * 7
214 else:
215 return hexsha[:7]
218def patch_filename(p: Optional[bytes], root: bytes) -> bytes:
219 """Generate patch filename.
221 Args:
222 p: Path or None
223 root: Root directory
225 Returns:
226 Full patch filename
227 """
228 if p is None:
229 return b"/dev/null"
230 else:
231 return root + b"/" + p
234def write_object_diff(
235 f: IO[bytes],
236 store: "BaseObjectStore",
237 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
238 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
239 diff_binary: bool = False,
240) -> None:
241 """Write the diff for an object.
243 Args:
244 f: File-like object to write to
245 store: Store to retrieve objects from, if necessary
246 old_file: (path, mode, hexsha) tuple
247 new_file: (path, mode, hexsha) tuple
248 diff_binary: Whether to diff files even if they
249 are considered binary files by is_binary().
251 Note: the tuple elements should be None for nonexistent files
252 """
253 (old_path, old_mode, old_id) = old_file
254 (new_path, new_mode, new_id) = new_file
255 patched_old_path = patch_filename(old_path, b"a")
256 patched_new_path = patch_filename(new_path, b"b")
258 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob:
259 """Get blob content for a file.
261 Args:
262 mode: File mode
263 hexsha: Object SHA
265 Returns:
266 Blob object
267 """
268 if hexsha is None:
269 return Blob.from_string(b"")
270 elif mode is not None and S_ISGITLINK(mode):
271 return Blob.from_string(b"Subproject commit " + hexsha + b"\n")
272 else:
273 obj = store[hexsha]
274 if isinstance(obj, Blob):
275 return obj
276 else:
277 # Fallback for non-blob objects
278 return Blob.from_string(obj.as_raw_string())
280 def lines(content: "Blob") -> list[bytes]:
281 """Split blob content into lines.
283 Args:
284 content: Blob content
286 Returns:
287 List of lines
288 """
289 if not content:
290 return []
291 else:
292 return content.splitlines()
294 f.writelines(
295 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id))
296 )
297 old_content = content(old_mode, old_id)
298 new_content = content(new_mode, new_id)
299 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)):
300 binary_diff = (
301 b"Binary files "
302 + patched_old_path
303 + b" and "
304 + patched_new_path
305 + b" differ\n"
306 )
307 f.write(binary_diff)
308 else:
309 f.writelines(
310 unified_diff(
311 lines(old_content),
312 lines(new_content),
313 patched_old_path,
314 patched_new_path,
315 )
316 )
319# TODO(jelmer): Support writing unicode, rather than bytes.
320def gen_diff_header(
321 paths: tuple[Optional[bytes], Optional[bytes]],
322 modes: tuple[Optional[int], Optional[int]],
323 shas: tuple[Optional[bytes], Optional[bytes]],
324) -> Generator[bytes, None, None]:
325 """Write a blob diff header.
327 Args:
328 paths: Tuple with old and new path
329 modes: Tuple with old and new modes
330 shas: Tuple with old and new shas
331 """
332 (old_path, new_path) = paths
333 (old_mode, new_mode) = modes
334 (old_sha, new_sha) = shas
335 if old_path is None and new_path is not None:
336 old_path = new_path
337 if new_path is None and old_path is not None:
338 new_path = old_path
339 old_path = patch_filename(old_path, b"a")
340 new_path = patch_filename(new_path, b"b")
341 yield b"diff --git " + old_path + b" " + new_path + b"\n"
343 if old_mode != new_mode:
344 if new_mode is not None:
345 if old_mode is not None:
346 yield (f"old file mode {old_mode:o}\n").encode("ascii")
347 yield (f"new file mode {new_mode:o}\n").encode("ascii")
348 else:
349 yield (f"deleted file mode {old_mode:o}\n").encode("ascii")
350 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha)
351 if new_mode is not None and old_mode is not None:
352 yield (f" {new_mode:o}").encode("ascii")
353 yield b"\n"
356# TODO(jelmer): Support writing unicode, rather than bytes.
357def write_blob_diff(
358 f: IO[bytes],
359 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
360 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
361) -> None:
362 """Write blob diff.
364 Args:
365 f: File-like object to write to
366 old_file: (path, mode, hexsha) tuple (None if nonexisting)
367 new_file: (path, mode, hexsha) tuple (None if nonexisting)
369 Note: The use of write_object_diff is recommended over this function.
370 """
371 (old_path, old_mode, old_blob) = old_file
372 (new_path, new_mode, new_blob) = new_file
373 patched_old_path = patch_filename(old_path, b"a")
374 patched_new_path = patch_filename(new_path, b"b")
376 def lines(blob: Optional["Blob"]) -> list[bytes]:
377 """Split blob content into lines.
379 Args:
380 blob: Blob object or None
382 Returns:
383 List of lines
384 """
385 if blob is not None:
386 return blob.splitlines()
387 else:
388 return []
390 f.writelines(
391 gen_diff_header(
392 (old_path, new_path),
393 (old_mode, new_mode),
394 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)),
395 )
396 )
397 old_contents = lines(old_blob)
398 new_contents = lines(new_blob)
399 f.writelines(
400 unified_diff(old_contents, new_contents, patched_old_path, patched_new_path)
401 )
404def write_tree_diff(
405 f: IO[bytes],
406 store: "BaseObjectStore",
407 old_tree: Optional[bytes],
408 new_tree: Optional[bytes],
409 diff_binary: bool = False,
410) -> None:
411 """Write tree diff.
413 Args:
414 f: File-like object to write to.
415 store: Object store to read from
416 old_tree: Old tree id
417 new_tree: New tree id
418 diff_binary: Whether to diff files even if they
419 are considered binary files by is_binary().
420 """
421 changes = store.tree_changes(old_tree, new_tree)
422 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
423 write_object_diff(
424 f,
425 store,
426 (oldpath, oldmode, oldsha),
427 (newpath, newmode, newsha),
428 diff_binary=diff_binary,
429 )
432def git_am_patch_split(
433 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None
434) -> tuple["Commit", bytes, Optional[bytes]]:
435 """Parse a git-am-style patch and split it up into bits.
437 Args:
438 f: File-like object to parse
439 encoding: Encoding to use when creating Git objects
440 Returns: Tuple with commit object, diff contents and git version
441 """
442 encoding = encoding or getattr(f, "encoding", "ascii")
443 encoding = encoding or "ascii"
444 contents = f.read()
445 if isinstance(contents, bytes):
446 bparser = email.parser.BytesParser()
447 msg = bparser.parsebytes(contents)
448 else:
449 uparser = email.parser.Parser()
450 msg = uparser.parsestr(contents)
451 return parse_patch_message(msg, encoding)
454def parse_patch_message(
455 msg: "email.message.Message", encoding: Optional[str] = None
456) -> tuple["Commit", bytes, Optional[bytes]]:
457 """Extract a Commit object and patch from an e-mail message.
459 Args:
460 msg: An email message (email.message.Message)
461 encoding: Encoding to use to encode Git commits
462 Returns: Tuple with commit object, diff contents and git version
463 """
464 c = Commit()
465 if encoding is None:
466 encoding = "ascii"
467 c.author = msg["from"].encode(encoding)
468 c.committer = msg["from"].encode(encoding)
469 try:
470 patch_tag_start = msg["subject"].index("[PATCH")
471 except ValueError:
472 subject = msg["subject"]
473 else:
474 close = msg["subject"].index("] ", patch_tag_start)
475 subject = msg["subject"][close + 2 :]
476 c.message = (subject.replace("\n", "") + "\n").encode(encoding)
477 first = True
479 body = msg.get_payload(decode=True)
480 if isinstance(body, str):
481 body = body.encode(encoding)
482 if isinstance(body, bytes):
483 lines = body.splitlines(True)
484 else:
485 # Handle other types by converting to string first
486 lines = str(body).encode(encoding).splitlines(True)
487 line_iter = iter(lines)
489 for line in line_iter:
490 if line == b"---\n":
491 break
492 if first:
493 if line.startswith(b"From: "):
494 c.author = line[len(b"From: ") :].rstrip()
495 else:
496 c.message += b"\n" + line
497 first = False
498 else:
499 c.message += line
500 diff = b""
501 for line in line_iter:
502 if line == b"-- \n":
503 break
504 diff += line
505 try:
506 version = next(line_iter).rstrip(b"\n")
507 except StopIteration:
508 version = None
509 return c, diff, version