Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

251 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28import email.parser 

29import time 

30from collections.abc import Generator, Sequence 

31from difflib import SequenceMatcher 

32from typing import ( 

33 IO, 

34 TYPE_CHECKING, 

35 BinaryIO, 

36 Optional, 

37 TextIO, 

38 Union, 

39) 

40 

41if TYPE_CHECKING: 

42 import email.message 

43 

44 from .object_store import BaseObjectStore 

45 

46from .objects import S_ISGITLINK, Blob, Commit 

47 

48FIRST_FEW_BYTES = 8000 

49 

50DEFAULT_DIFF_ALGORITHM = "myers" 

51 

52 

53class DiffAlgorithmNotAvailable(Exception): 

54 """Raised when a requested diff algorithm is not available.""" 

55 

56 def __init__(self, algorithm: str, install_hint: str = "") -> None: 

57 """Initialize exception. 

58 

59 Args: 

60 algorithm: Name of the unavailable algorithm 

61 install_hint: Optional installation hint 

62 """ 

63 self.algorithm = algorithm 

64 self.install_hint = install_hint 

65 if install_hint: 

66 super().__init__( 

67 f"Diff algorithm '{algorithm}' requested but not available. {install_hint}" 

68 ) 

69 else: 

70 super().__init__( 

71 f"Diff algorithm '{algorithm}' requested but not available." 

72 ) 

73 

74 

75def write_commit_patch( 

76 f: IO[bytes], 

77 commit: "Commit", 

78 contents: Union[str, bytes], 

79 progress: tuple[int, int], 

80 version: Optional[str] = None, 

81 encoding: Optional[str] = None, 

82) -> None: 

83 """Write a individual file patch. 

84 

85 Args: 

86 f: File-like object to write to 

87 commit: Commit object 

88 contents: Contents of the patch 

89 progress: tuple with current patch number and total. 

90 version: Version string to include in patch header 

91 encoding: Encoding to use for the patch 

92 

93 Returns: 

94 tuple with filename and contents 

95 """ 

96 encoding = encoding or getattr(f, "encoding", "ascii") 

97 if encoding is None: 

98 encoding = "ascii" 

99 if isinstance(contents, str): 

100 contents = contents.encode(encoding) 

101 (num, total) = progress 

102 f.write( 

103 b"From " 

104 + commit.id 

105 + b" " 

106 + time.ctime(commit.commit_time).encode(encoding) 

107 + b"\n" 

108 ) 

109 f.write(b"From: " + commit.author + b"\n") 

110 f.write( 

111 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

112 ) 

113 f.write( 

114 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

115 ) 

116 f.write(b"\n") 

117 f.write(b"---\n") 

118 try: 

119 import subprocess 

120 

121 p = subprocess.Popen( 

122 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

123 ) 

124 except (ImportError, OSError): 

125 pass # diffstat not available? 

126 else: 

127 (diffstat, _) = p.communicate(contents) 

128 f.write(diffstat) 

129 f.write(b"\n") 

130 f.write(contents) 

131 f.write(b"-- \n") 

132 if version is None: 

133 from dulwich import __version__ as dulwich_version 

134 

135 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

136 else: 

137 if encoding is None: 

138 encoding = "ascii" 

139 f.write(version.encode(encoding) + b"\n") 

140 

141 

142def get_summary(commit: "Commit") -> str: 

143 """Determine the summary line for use in a filename. 

144 

145 Args: 

146 commit: Commit 

147 Returns: Summary string 

148 """ 

149 decoded = commit.message.decode(errors="replace") 

150 lines = decoded.splitlines() 

151 return lines[0].replace(" ", "-") if lines else "" 

152 

153 

154# Unified Diff 

155def _format_range_unified(start: int, stop: int) -> str: 

156 """Convert range to the "ed" format.""" 

157 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

158 beginning = start + 1 # lines start numbering with one 

159 length = stop - start 

160 if length == 1: 

161 return f"{beginning}" 

162 if not length: 

163 beginning -= 1 # empty ranges begin at line just before the range 

164 return f"{beginning},{length}" 

165 

166 

167def unified_diff( 

168 a: Sequence[bytes], 

169 b: Sequence[bytes], 

170 fromfile: bytes = b"", 

171 tofile: bytes = b"", 

172 fromfiledate: str = "", 

173 tofiledate: str = "", 

174 n: int = 3, 

175 lineterm: str = "\n", 

176 tree_encoding: str = "utf-8", 

177 output_encoding: str = "utf-8", 

178) -> Generator[bytes, None, None]: 

179 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does. 

180 

181 Based on the same function in Python2.7 difflib.py 

182 """ 

183 started = False 

184 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

185 if not started: 

186 started = True 

187 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

188 todate = f"\t{tofiledate}" if tofiledate else "" 

189 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

190 output_encoding 

191 ) 

192 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

193 output_encoding 

194 ) 

195 

196 first, last = group[0], group[-1] 

197 file1_range = _format_range_unified(first[1], last[2]) 

198 file2_range = _format_range_unified(first[3], last[4]) 

199 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

200 

201 for tag, i1, i2, j1, j2 in group: 

202 if tag == "equal": 

203 for line in a[i1:i2]: 

204 yield b" " + line 

205 continue 

206 if tag in ("replace", "delete"): 

207 for line in a[i1:i2]: 

208 if not line[-1:] == b"\n": 

209 line += b"\n\\ No newline at end of file\n" 

210 yield b"-" + line 

211 if tag in ("replace", "insert"): 

212 for line in b[j1:j2]: 

213 if not line[-1:] == b"\n": 

214 line += b"\n\\ No newline at end of file\n" 

215 yield b"+" + line 

216 

217 

218def _get_sequence_matcher( 

219 algorithm: str, a: Sequence[bytes], b: Sequence[bytes] 

220) -> SequenceMatcher[bytes]: 

221 """Get appropriate sequence matcher for the given algorithm. 

222 

223 Args: 

224 algorithm: Diff algorithm ("myers" or "patience") 

225 a: First sequence 

226 b: Second sequence 

227 

228 Returns: 

229 Configured sequence matcher instance 

230 

231 Raises: 

232 DiffAlgorithmNotAvailable: If patience requested but not available 

233 """ 

234 if algorithm == "patience": 

235 try: 

236 from patiencediff import PatienceSequenceMatcher 

237 

238 return PatienceSequenceMatcher(None, a, b) # type: ignore[no-any-return,unused-ignore] 

239 except ImportError: 

240 raise DiffAlgorithmNotAvailable( 

241 "patience", "Install with: pip install 'dulwich[patiencediff]'" 

242 ) 

243 else: 

244 return SequenceMatcher(a=a, b=b) 

245 

246 

247def unified_diff_with_algorithm( 

248 a: Sequence[bytes], 

249 b: Sequence[bytes], 

250 fromfile: bytes = b"", 

251 tofile: bytes = b"", 

252 fromfiledate: str = "", 

253 tofiledate: str = "", 

254 n: int = 3, 

255 lineterm: str = "\n", 

256 tree_encoding: str = "utf-8", 

257 output_encoding: str = "utf-8", 

258 algorithm: Optional[str] = None, 

259) -> Generator[bytes, None, None]: 

260 """Generate unified diff with specified algorithm. 

261 

262 Args: 

263 a: First sequence of lines 

264 b: Second sequence of lines 

265 fromfile: Name of first file 

266 tofile: Name of second file 

267 fromfiledate: Date of first file 

268 tofiledate: Date of second file 

269 n: Number of context lines 

270 lineterm: Line terminator 

271 tree_encoding: Encoding for tree paths 

272 output_encoding: Encoding for output 

273 algorithm: Diff algorithm to use ("myers" or "patience") 

274 

275 Returns: 

276 Generator yielding diff lines 

277 

278 Raises: 

279 DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available 

280 """ 

281 if algorithm is None: 

282 algorithm = DEFAULT_DIFF_ALGORITHM 

283 

284 matcher = _get_sequence_matcher(algorithm, a, b) 

285 

286 started = False 

287 for group in matcher.get_grouped_opcodes(n): 

288 if not started: 

289 started = True 

290 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

291 todate = f"\t{tofiledate}" if tofiledate else "" 

292 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

293 output_encoding 

294 ) 

295 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

296 output_encoding 

297 ) 

298 

299 first, last = group[0], group[-1] 

300 file1_range = _format_range_unified(first[1], last[2]) 

301 file2_range = _format_range_unified(first[3], last[4]) 

302 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

303 

304 for tag, i1, i2, j1, j2 in group: 

305 if tag == "equal": 

306 for line in a[i1:i2]: 

307 yield b" " + line 

308 continue 

309 if tag in ("replace", "delete"): 

310 for line in a[i1:i2]: 

311 if not line[-1:] == b"\n": 

312 line += b"\n\\ No newline at end of file\n" 

313 yield b"-" + line 

314 if tag in ("replace", "insert"): 

315 for line in b[j1:j2]: 

316 if not line[-1:] == b"\n": 

317 line += b"\n\\ No newline at end of file\n" 

318 yield b"+" + line 

319 

320 

321def is_binary(content: bytes) -> bool: 

322 """See if the first few bytes contain any null characters. 

323 

324 Args: 

325 content: Bytestring to check for binary content 

326 """ 

327 return b"\0" in content[:FIRST_FEW_BYTES] 

328 

329 

330def shortid(hexsha: Optional[bytes]) -> bytes: 

331 """Get short object ID. 

332 

333 Args: 

334 hexsha: Full hex SHA or None 

335 

336 Returns: 

337 7-character short ID 

338 """ 

339 if hexsha is None: 

340 return b"0" * 7 

341 else: 

342 return hexsha[:7] 

343 

344 

345def patch_filename(p: Optional[bytes], root: bytes) -> bytes: 

346 """Generate patch filename. 

347 

348 Args: 

349 p: Path or None 

350 root: Root directory 

351 

352 Returns: 

353 Full patch filename 

354 """ 

355 if p is None: 

356 return b"/dev/null" 

357 else: 

358 return root + b"/" + p 

359 

360 

361def write_object_diff( 

362 f: IO[bytes], 

363 store: "BaseObjectStore", 

364 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

365 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

366 diff_binary: bool = False, 

367 diff_algorithm: Optional[str] = None, 

368) -> None: 

369 """Write the diff for an object. 

370 

371 Args: 

372 f: File-like object to write to 

373 store: Store to retrieve objects from, if necessary 

374 old_file: (path, mode, hexsha) tuple 

375 new_file: (path, mode, hexsha) tuple 

376 diff_binary: Whether to diff files even if they 

377 are considered binary files by is_binary(). 

378 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

379 

380 Note: the tuple elements should be None for nonexistent files 

381 """ 

382 (old_path, old_mode, old_id) = old_file 

383 (new_path, new_mode, new_id) = new_file 

384 patched_old_path = patch_filename(old_path, b"a") 

385 patched_new_path = patch_filename(new_path, b"b") 

386 

387 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob: 

388 """Get blob content for a file. 

389 

390 Args: 

391 mode: File mode 

392 hexsha: Object SHA 

393 

394 Returns: 

395 Blob object 

396 """ 

397 if hexsha is None: 

398 return Blob.from_string(b"") 

399 elif mode is not None and S_ISGITLINK(mode): 

400 return Blob.from_string(b"Subproject commit " + hexsha + b"\n") 

401 else: 

402 obj = store[hexsha] 

403 if isinstance(obj, Blob): 

404 return obj 

405 else: 

406 # Fallback for non-blob objects 

407 return Blob.from_string(obj.as_raw_string()) 

408 

409 def lines(content: "Blob") -> list[bytes]: 

410 """Split blob content into lines. 

411 

412 Args: 

413 content: Blob content 

414 

415 Returns: 

416 List of lines 

417 """ 

418 if not content: 

419 return [] 

420 else: 

421 return content.splitlines() 

422 

423 f.writelines( 

424 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

425 ) 

426 old_content = content(old_mode, old_id) 

427 new_content = content(new_mode, new_id) 

428 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

429 binary_diff = ( 

430 b"Binary files " 

431 + patched_old_path 

432 + b" and " 

433 + patched_new_path 

434 + b" differ\n" 

435 ) 

436 f.write(binary_diff) 

437 else: 

438 f.writelines( 

439 unified_diff_with_algorithm( 

440 lines(old_content), 

441 lines(new_content), 

442 patched_old_path, 

443 patched_new_path, 

444 algorithm=diff_algorithm, 

445 ) 

446 ) 

447 

448 

449# TODO(jelmer): Support writing unicode, rather than bytes. 

450def gen_diff_header( 

451 paths: tuple[Optional[bytes], Optional[bytes]], 

452 modes: tuple[Optional[int], Optional[int]], 

453 shas: tuple[Optional[bytes], Optional[bytes]], 

454) -> Generator[bytes, None, None]: 

455 """Write a blob diff header. 

456 

457 Args: 

458 paths: Tuple with old and new path 

459 modes: Tuple with old and new modes 

460 shas: Tuple with old and new shas 

461 """ 

462 (old_path, new_path) = paths 

463 (old_mode, new_mode) = modes 

464 (old_sha, new_sha) = shas 

465 if old_path is None and new_path is not None: 

466 old_path = new_path 

467 if new_path is None and old_path is not None: 

468 new_path = old_path 

469 old_path = patch_filename(old_path, b"a") 

470 new_path = patch_filename(new_path, b"b") 

471 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

472 

473 if old_mode != new_mode: 

474 if new_mode is not None: 

475 if old_mode is not None: 

476 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

477 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

478 else: 

479 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

480 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

481 if new_mode is not None and old_mode is not None: 

482 yield (f" {new_mode:o}").encode("ascii") 

483 yield b"\n" 

484 

485 

486# TODO(jelmer): Support writing unicode, rather than bytes. 

487def write_blob_diff( 

488 f: IO[bytes], 

489 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

490 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

491 diff_algorithm: Optional[str] = None, 

492) -> None: 

493 """Write blob diff. 

494 

495 Args: 

496 f: File-like object to write to 

497 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

498 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

499 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

500 

501 Note: The use of write_object_diff is recommended over this function. 

502 """ 

503 (old_path, old_mode, old_blob) = old_file 

504 (new_path, new_mode, new_blob) = new_file 

505 patched_old_path = patch_filename(old_path, b"a") 

506 patched_new_path = patch_filename(new_path, b"b") 

507 

508 def lines(blob: Optional["Blob"]) -> list[bytes]: 

509 """Split blob content into lines. 

510 

511 Args: 

512 blob: Blob object or None 

513 

514 Returns: 

515 List of lines 

516 """ 

517 if blob is not None: 

518 return blob.splitlines() 

519 else: 

520 return [] 

521 

522 f.writelines( 

523 gen_diff_header( 

524 (old_path, new_path), 

525 (old_mode, new_mode), 

526 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

527 ) 

528 ) 

529 old_contents = lines(old_blob) 

530 new_contents = lines(new_blob) 

531 f.writelines( 

532 unified_diff_with_algorithm( 

533 old_contents, 

534 new_contents, 

535 patched_old_path, 

536 patched_new_path, 

537 algorithm=diff_algorithm, 

538 ) 

539 ) 

540 

541 

542def write_tree_diff( 

543 f: IO[bytes], 

544 store: "BaseObjectStore", 

545 old_tree: Optional[bytes], 

546 new_tree: Optional[bytes], 

547 diff_binary: bool = False, 

548 diff_algorithm: Optional[str] = None, 

549) -> None: 

550 """Write tree diff. 

551 

552 Args: 

553 f: File-like object to write to. 

554 store: Object store to read from 

555 old_tree: Old tree id 

556 new_tree: New tree id 

557 diff_binary: Whether to diff files even if they 

558 are considered binary files by is_binary(). 

559 diff_algorithm: Algorithm to use for diffing ("myers" or "patience") 

560 """ 

561 changes = store.tree_changes(old_tree, new_tree) 

562 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

563 write_object_diff( 

564 f, 

565 store, 

566 (oldpath, oldmode, oldsha), 

567 (newpath, newmode, newsha), 

568 diff_binary=diff_binary, 

569 diff_algorithm=diff_algorithm, 

570 ) 

571 

572 

573def git_am_patch_split( 

574 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None 

575) -> tuple["Commit", bytes, Optional[bytes]]: 

576 """Parse a git-am-style patch and split it up into bits. 

577 

578 Args: 

579 f: File-like object to parse 

580 encoding: Encoding to use when creating Git objects 

581 Returns: Tuple with commit object, diff contents and git version 

582 """ 

583 encoding = encoding or getattr(f, "encoding", "ascii") 

584 encoding = encoding or "ascii" 

585 contents = f.read() 

586 if isinstance(contents, bytes): 

587 bparser = email.parser.BytesParser() 

588 msg = bparser.parsebytes(contents) 

589 else: 

590 uparser = email.parser.Parser() 

591 msg = uparser.parsestr(contents) 

592 return parse_patch_message(msg, encoding) 

593 

594 

595def parse_patch_message( 

596 msg: "email.message.Message", encoding: Optional[str] = None 

597) -> tuple["Commit", bytes, Optional[bytes]]: 

598 """Extract a Commit object and patch from an e-mail message. 

599 

600 Args: 

601 msg: An email message (email.message.Message) 

602 encoding: Encoding to use to encode Git commits 

603 Returns: Tuple with commit object, diff contents and git version 

604 """ 

605 c = Commit() 

606 if encoding is None: 

607 encoding = "ascii" 

608 c.author = msg["from"].encode(encoding) 

609 c.committer = msg["from"].encode(encoding) 

610 try: 

611 patch_tag_start = msg["subject"].index("[PATCH") 

612 except ValueError: 

613 subject = msg["subject"] 

614 else: 

615 close = msg["subject"].index("] ", patch_tag_start) 

616 subject = msg["subject"][close + 2 :] 

617 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

618 first = True 

619 

620 body = msg.get_payload(decode=True) 

621 if isinstance(body, str): 

622 body = body.encode(encoding) 

623 if isinstance(body, bytes): 

624 lines = body.splitlines(True) 

625 else: 

626 # Handle other types by converting to string first 

627 lines = str(body).encode(encoding).splitlines(True) 

628 line_iter = iter(lines) 

629 

630 for line in line_iter: 

631 if line == b"---\n": 

632 break 

633 if first: 

634 if line.startswith(b"From: "): 

635 c.author = line[len(b"From: ") :].rstrip() 

636 else: 

637 c.message += b"\n" + line 

638 first = False 

639 else: 

640 c.message += line 

641 diff = b"" 

642 for line in line_iter: 

643 if line == b"-- \n": 

644 break 

645 diff += line 

646 try: 

647 version = next(line_iter).rstrip(b"\n") 

648 except StopIteration: 

649 version = None 

650 return c, diff, version