Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

204 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28import email.parser 

29import time 

30from collections.abc import Generator 

31from difflib import SequenceMatcher 

32from typing import ( 

33 IO, 

34 TYPE_CHECKING, 

35 BinaryIO, 

36 Optional, 

37 TextIO, 

38 Union, 

39) 

40 

41if TYPE_CHECKING: 

42 import email.message 

43 

44 from .object_store import BaseObjectStore 

45 

46from .objects import S_ISGITLINK, Blob, Commit 

47 

48FIRST_FEW_BYTES = 8000 

49 

50 

51def write_commit_patch( 

52 f: IO[bytes], 

53 commit: "Commit", 

54 contents: Union[str, bytes], 

55 progress: tuple[int, int], 

56 version: Optional[str] = None, 

57 encoding: Optional[str] = None, 

58) -> None: 

59 """Write a individual file patch. 

60 

61 Args: 

62 f: File-like object to write to 

63 commit: Commit object 

64 contents: Contents of the patch 

65 progress: tuple with current patch number and total. 

66 version: Version string to include in patch header 

67 encoding: Encoding to use for the patch 

68 

69 Returns: 

70 tuple with filename and contents 

71 """ 

72 encoding = encoding or getattr(f, "encoding", "ascii") 

73 if encoding is None: 

74 encoding = "ascii" 

75 if isinstance(contents, str): 

76 contents = contents.encode(encoding) 

77 (num, total) = progress 

78 f.write( 

79 b"From " 

80 + commit.id 

81 + b" " 

82 + time.ctime(commit.commit_time).encode(encoding) 

83 + b"\n" 

84 ) 

85 f.write(b"From: " + commit.author + b"\n") 

86 f.write( 

87 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

88 ) 

89 f.write( 

90 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

91 ) 

92 f.write(b"\n") 

93 f.write(b"---\n") 

94 try: 

95 import subprocess 

96 

97 p = subprocess.Popen( 

98 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

99 ) 

100 except (ImportError, OSError): 

101 pass # diffstat not available? 

102 else: 

103 (diffstat, _) = p.communicate(contents) 

104 f.write(diffstat) 

105 f.write(b"\n") 

106 f.write(contents) 

107 f.write(b"-- \n") 

108 if version is None: 

109 from dulwich import __version__ as dulwich_version 

110 

111 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

112 else: 

113 if encoding is None: 

114 encoding = "ascii" 

115 f.write(version.encode(encoding) + b"\n") 

116 

117 

118def get_summary(commit: "Commit") -> str: 

119 """Determine the summary line for use in a filename. 

120 

121 Args: 

122 commit: Commit 

123 Returns: Summary string 

124 """ 

125 decoded = commit.message.decode(errors="replace") 

126 lines = decoded.splitlines() 

127 return lines[0].replace(" ", "-") if lines else "" 

128 

129 

130# Unified Diff 

131def _format_range_unified(start: int, stop: int) -> str: 

132 """Convert range to the "ed" format.""" 

133 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

134 beginning = start + 1 # lines start numbering with one 

135 length = stop - start 

136 if length == 1: 

137 return f"{beginning}" 

138 if not length: 

139 beginning -= 1 # empty ranges begin at line just before the range 

140 return f"{beginning},{length}" 

141 

142 

143def unified_diff( 

144 a: list[bytes], 

145 b: list[bytes], 

146 fromfile: bytes = b"", 

147 tofile: bytes = b"", 

148 fromfiledate: str = "", 

149 tofiledate: str = "", 

150 n: int = 3, 

151 lineterm: str = "\n", 

152 tree_encoding: str = "utf-8", 

153 output_encoding: str = "utf-8", 

154) -> Generator[bytes, None, None]: 

155 """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does. 

156 

157 Based on the same function in Python2.7 difflib.py 

158 """ 

159 started = False 

160 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

161 if not started: 

162 started = True 

163 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

164 todate = f"\t{tofiledate}" if tofiledate else "" 

165 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

166 output_encoding 

167 ) 

168 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

169 output_encoding 

170 ) 

171 

172 first, last = group[0], group[-1] 

173 file1_range = _format_range_unified(first[1], last[2]) 

174 file2_range = _format_range_unified(first[3], last[4]) 

175 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

176 

177 for tag, i1, i2, j1, j2 in group: 

178 if tag == "equal": 

179 for line in a[i1:i2]: 

180 yield b" " + line 

181 continue 

182 if tag in ("replace", "delete"): 

183 for line in a[i1:i2]: 

184 if not line[-1:] == b"\n": 

185 line += b"\n\\ No newline at end of file\n" 

186 yield b"-" + line 

187 if tag in ("replace", "insert"): 

188 for line in b[j1:j2]: 

189 if not line[-1:] == b"\n": 

190 line += b"\n\\ No newline at end of file\n" 

191 yield b"+" + line 

192 

193 

194def is_binary(content: bytes) -> bool: 

195 """See if the first few bytes contain any null characters. 

196 

197 Args: 

198 content: Bytestring to check for binary content 

199 """ 

200 return b"\0" in content[:FIRST_FEW_BYTES] 

201 

202 

203def shortid(hexsha: Optional[bytes]) -> bytes: 

204 """Get short object ID. 

205 

206 Args: 

207 hexsha: Full hex SHA or None 

208 

209 Returns: 

210 7-character short ID 

211 """ 

212 if hexsha is None: 

213 return b"0" * 7 

214 else: 

215 return hexsha[:7] 

216 

217 

218def patch_filename(p: Optional[bytes], root: bytes) -> bytes: 

219 """Generate patch filename. 

220 

221 Args: 

222 p: Path or None 

223 root: Root directory 

224 

225 Returns: 

226 Full patch filename 

227 """ 

228 if p is None: 

229 return b"/dev/null" 

230 else: 

231 return root + b"/" + p 

232 

233 

234def write_object_diff( 

235 f: IO[bytes], 

236 store: "BaseObjectStore", 

237 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

238 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

239 diff_binary: bool = False, 

240) -> None: 

241 """Write the diff for an object. 

242 

243 Args: 

244 f: File-like object to write to 

245 store: Store to retrieve objects from, if necessary 

246 old_file: (path, mode, hexsha) tuple 

247 new_file: (path, mode, hexsha) tuple 

248 diff_binary: Whether to diff files even if they 

249 are considered binary files by is_binary(). 

250 

251 Note: the tuple elements should be None for nonexistent files 

252 """ 

253 (old_path, old_mode, old_id) = old_file 

254 (new_path, new_mode, new_id) = new_file 

255 patched_old_path = patch_filename(old_path, b"a") 

256 patched_new_path = patch_filename(new_path, b"b") 

257 

258 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob: 

259 """Get blob content for a file. 

260 

261 Args: 

262 mode: File mode 

263 hexsha: Object SHA 

264 

265 Returns: 

266 Blob object 

267 """ 

268 if hexsha is None: 

269 return Blob.from_string(b"") 

270 elif mode is not None and S_ISGITLINK(mode): 

271 return Blob.from_string(b"Subproject commit " + hexsha + b"\n") 

272 else: 

273 obj = store[hexsha] 

274 if isinstance(obj, Blob): 

275 return obj 

276 else: 

277 # Fallback for non-blob objects 

278 return Blob.from_string(obj.as_raw_string()) 

279 

280 def lines(content: "Blob") -> list[bytes]: 

281 """Split blob content into lines. 

282 

283 Args: 

284 content: Blob content 

285 

286 Returns: 

287 List of lines 

288 """ 

289 if not content: 

290 return [] 

291 else: 

292 return content.splitlines() 

293 

294 f.writelines( 

295 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

296 ) 

297 old_content = content(old_mode, old_id) 

298 new_content = content(new_mode, new_id) 

299 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

300 binary_diff = ( 

301 b"Binary files " 

302 + patched_old_path 

303 + b" and " 

304 + patched_new_path 

305 + b" differ\n" 

306 ) 

307 f.write(binary_diff) 

308 else: 

309 f.writelines( 

310 unified_diff( 

311 lines(old_content), 

312 lines(new_content), 

313 patched_old_path, 

314 patched_new_path, 

315 ) 

316 ) 

317 

318 

319# TODO(jelmer): Support writing unicode, rather than bytes. 

320def gen_diff_header( 

321 paths: tuple[Optional[bytes], Optional[bytes]], 

322 modes: tuple[Optional[int], Optional[int]], 

323 shas: tuple[Optional[bytes], Optional[bytes]], 

324) -> Generator[bytes, None, None]: 

325 """Write a blob diff header. 

326 

327 Args: 

328 paths: Tuple with old and new path 

329 modes: Tuple with old and new modes 

330 shas: Tuple with old and new shas 

331 """ 

332 (old_path, new_path) = paths 

333 (old_mode, new_mode) = modes 

334 (old_sha, new_sha) = shas 

335 if old_path is None and new_path is not None: 

336 old_path = new_path 

337 if new_path is None and old_path is not None: 

338 new_path = old_path 

339 old_path = patch_filename(old_path, b"a") 

340 new_path = patch_filename(new_path, b"b") 

341 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

342 

343 if old_mode != new_mode: 

344 if new_mode is not None: 

345 if old_mode is not None: 

346 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

347 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

348 else: 

349 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

350 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

351 if new_mode is not None and old_mode is not None: 

352 yield (f" {new_mode:o}").encode("ascii") 

353 yield b"\n" 

354 

355 

356# TODO(jelmer): Support writing unicode, rather than bytes. 

357def write_blob_diff( 

358 f: IO[bytes], 

359 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

360 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

361) -> None: 

362 """Write blob diff. 

363 

364 Args: 

365 f: File-like object to write to 

366 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

367 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

368 

369 Note: The use of write_object_diff is recommended over this function. 

370 """ 

371 (old_path, old_mode, old_blob) = old_file 

372 (new_path, new_mode, new_blob) = new_file 

373 patched_old_path = patch_filename(old_path, b"a") 

374 patched_new_path = patch_filename(new_path, b"b") 

375 

376 def lines(blob: Optional["Blob"]) -> list[bytes]: 

377 """Split blob content into lines. 

378 

379 Args: 

380 blob: Blob object or None 

381 

382 Returns: 

383 List of lines 

384 """ 

385 if blob is not None: 

386 return blob.splitlines() 

387 else: 

388 return [] 

389 

390 f.writelines( 

391 gen_diff_header( 

392 (old_path, new_path), 

393 (old_mode, new_mode), 

394 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

395 ) 

396 ) 

397 old_contents = lines(old_blob) 

398 new_contents = lines(new_blob) 

399 f.writelines( 

400 unified_diff(old_contents, new_contents, patched_old_path, patched_new_path) 

401 ) 

402 

403 

404def write_tree_diff( 

405 f: IO[bytes], 

406 store: "BaseObjectStore", 

407 old_tree: Optional[bytes], 

408 new_tree: Optional[bytes], 

409 diff_binary: bool = False, 

410) -> None: 

411 """Write tree diff. 

412 

413 Args: 

414 f: File-like object to write to. 

415 store: Object store to read from 

416 old_tree: Old tree id 

417 new_tree: New tree id 

418 diff_binary: Whether to diff files even if they 

419 are considered binary files by is_binary(). 

420 """ 

421 changes = store.tree_changes(old_tree, new_tree) 

422 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

423 write_object_diff( 

424 f, 

425 store, 

426 (oldpath, oldmode, oldsha), 

427 (newpath, newmode, newsha), 

428 diff_binary=diff_binary, 

429 ) 

430 

431 

432def git_am_patch_split( 

433 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None 

434) -> tuple["Commit", bytes, Optional[bytes]]: 

435 """Parse a git-am-style patch and split it up into bits. 

436 

437 Args: 

438 f: File-like object to parse 

439 encoding: Encoding to use when creating Git objects 

440 Returns: Tuple with commit object, diff contents and git version 

441 """ 

442 encoding = encoding or getattr(f, "encoding", "ascii") 

443 encoding = encoding or "ascii" 

444 contents = f.read() 

445 if isinstance(contents, bytes): 

446 bparser = email.parser.BytesParser() 

447 msg = bparser.parsebytes(contents) 

448 else: 

449 uparser = email.parser.Parser() 

450 msg = uparser.parsestr(contents) 

451 return parse_patch_message(msg, encoding) 

452 

453 

454def parse_patch_message( 

455 msg: "email.message.Message", encoding: Optional[str] = None 

456) -> tuple["Commit", bytes, Optional[bytes]]: 

457 """Extract a Commit object and patch from an e-mail message. 

458 

459 Args: 

460 msg: An email message (email.message.Message) 

461 encoding: Encoding to use to encode Git commits 

462 Returns: Tuple with commit object, diff contents and git version 

463 """ 

464 c = Commit() 

465 if encoding is None: 

466 encoding = "ascii" 

467 c.author = msg["from"].encode(encoding) 

468 c.committer = msg["from"].encode(encoding) 

469 try: 

470 patch_tag_start = msg["subject"].index("[PATCH") 

471 except ValueError: 

472 subject = msg["subject"] 

473 else: 

474 close = msg["subject"].index("] ", patch_tag_start) 

475 subject = msg["subject"][close + 2 :] 

476 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

477 first = True 

478 

479 body = msg.get_payload(decode=True) 

480 if isinstance(body, str): 

481 body = body.encode(encoding) 

482 if isinstance(body, bytes): 

483 lines = body.splitlines(True) 

484 else: 

485 # Handle other types by converting to string first 

486 lines = str(body).encode(encoding).splitlines(True) 

487 line_iter = iter(lines) 

488 

489 for line in line_iter: 

490 if line == b"---\n": 

491 break 

492 if first: 

493 if line.startswith(b"From: "): 

494 c.author = line[len(b"From: ") :].rstrip() 

495 else: 

496 c.message += b"\n" + line 

497 first = False 

498 else: 

499 c.message += line 

500 diff = b"" 

501 for line in line_iter: 

502 if line == b"-- \n": 

503 break 

504 diff += line 

505 try: 

506 version = next(line_iter).rstrip(b"\n") 

507 except StopIteration: 

508 version = None 

509 return c, diff, version