Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/patch.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

205 statements  

1# patch.py -- For dealing with packed-style patches. 

2# Copyright (C) 2009-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

3# 

4# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as published by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Classes for dealing with git am-style patches. 

23 

24These patches are basically unified diffs with some extra metadata tacked 

25on. 

26""" 

27 

28import email.parser 

29import time 

30from collections.abc import Generator 

31from difflib import SequenceMatcher 

32from typing import ( 

33 TYPE_CHECKING, 

34 BinaryIO, 

35 Optional, 

36 TextIO, 

37 Union, 

38) 

39 

40if TYPE_CHECKING: 

41 import email.message 

42 

43 from .object_store import BaseObjectStore 

44 

45from .objects import S_ISGITLINK, Blob, Commit 

46 

47FIRST_FEW_BYTES = 8000 

48 

49 

50def write_commit_patch( 

51 f: BinaryIO, 

52 commit: "Commit", 

53 contents: Union[str, bytes], 

54 progress: tuple[int, int], 

55 version: Optional[str] = None, 

56 encoding: Optional[str] = None, 

57) -> None: 

58 """Write a individual file patch. 

59 

60 Args: 

61 commit: Commit object 

62 progress: tuple with current patch number and total. 

63 

64 Returns: 

65 tuple with filename and contents 

66 """ 

67 encoding = encoding or getattr(f, "encoding", "ascii") 

68 if encoding is None: 

69 encoding = "ascii" 

70 if isinstance(contents, str): 

71 contents = contents.encode(encoding) 

72 (num, total) = progress 

73 f.write( 

74 b"From " 

75 + commit.id 

76 + b" " 

77 + time.ctime(commit.commit_time).encode(encoding) 

78 + b"\n" 

79 ) 

80 f.write(b"From: " + commit.author + b"\n") 

81 f.write( 

82 b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n" 

83 ) 

84 f.write( 

85 (f"Subject: [PATCH {num}/{total}] ").encode(encoding) + commit.message + b"\n" 

86 ) 

87 f.write(b"\n") 

88 f.write(b"---\n") 

89 try: 

90 import subprocess 

91 

92 p = subprocess.Popen( 

93 ["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE 

94 ) 

95 except (ImportError, OSError): 

96 pass # diffstat not available? 

97 else: 

98 (diffstat, _) = p.communicate(contents) 

99 f.write(diffstat) 

100 f.write(b"\n") 

101 f.write(contents) 

102 f.write(b"-- \n") 

103 if version is None: 

104 from dulwich import __version__ as dulwich_version 

105 

106 f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) 

107 else: 

108 if encoding is None: 

109 encoding = "ascii" 

110 f.write(version.encode(encoding) + b"\n") 

111 

112 

113def get_summary(commit: "Commit") -> str: 

114 """Determine the summary line for use in a filename. 

115 

116 Args: 

117 commit: Commit 

118 Returns: Summary string 

119 """ 

120 decoded = commit.message.decode(errors="replace") 

121 lines = decoded.splitlines() 

122 return lines[0].replace(" ", "-") if lines else "" 

123 

124 

125# Unified Diff 

126def _format_range_unified(start: int, stop: int) -> str: 

127 """Convert range to the "ed" format.""" 

128 # Per the diff spec at http://www.unix.org/single_unix_specification/ 

129 beginning = start + 1 # lines start numbering with one 

130 length = stop - start 

131 if length == 1: 

132 return f"{beginning}" 

133 if not length: 

134 beginning -= 1 # empty ranges begin at line just before the range 

135 return f"{beginning},{length}" 

136 

137 

138def unified_diff( 

139 a: list[bytes], 

140 b: list[bytes], 

141 fromfile: bytes = b"", 

142 tofile: bytes = b"", 

143 fromfiledate: str = "", 

144 tofiledate: str = "", 

145 n: int = 3, 

146 lineterm: str = "\n", 

147 tree_encoding: str = "utf-8", 

148 output_encoding: str = "utf-8", 

149) -> Generator[bytes, None, None]: 

150 """difflib.unified_diff that can detect "No newline at end of file" as 

151 original "git diff" does. 

152 

153 Based on the same function in Python2.7 difflib.py 

154 """ 

155 started = False 

156 for group in SequenceMatcher(a=a, b=b).get_grouped_opcodes(n): 

157 if not started: 

158 started = True 

159 fromdate = f"\t{fromfiledate}" if fromfiledate else "" 

160 todate = f"\t{tofiledate}" if tofiledate else "" 

161 yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode( 

162 output_encoding 

163 ) 

164 yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode( 

165 output_encoding 

166 ) 

167 

168 first, last = group[0], group[-1] 

169 file1_range = _format_range_unified(first[1], last[2]) 

170 file2_range = _format_range_unified(first[3], last[4]) 

171 yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding) 

172 

173 for tag, i1, i2, j1, j2 in group: 

174 if tag == "equal": 

175 for line in a[i1:i2]: 

176 yield b" " + line 

177 continue 

178 if tag in ("replace", "delete"): 

179 for line in a[i1:i2]: 

180 if not line[-1:] == b"\n": 

181 line += b"\n\\ No newline at end of file\n" 

182 yield b"-" + line 

183 if tag in ("replace", "insert"): 

184 for line in b[j1:j2]: 

185 if not line[-1:] == b"\n": 

186 line += b"\n\\ No newline at end of file\n" 

187 yield b"+" + line 

188 

189 

190def is_binary(content: bytes) -> bool: 

191 """See if the first few bytes contain any null characters. 

192 

193 Args: 

194 content: Bytestring to check for binary content 

195 """ 

196 return b"\0" in content[:FIRST_FEW_BYTES] 

197 

198 

199def shortid(hexsha: Optional[bytes]) -> bytes: 

200 if hexsha is None: 

201 return b"0" * 7 

202 else: 

203 return hexsha[:7] 

204 

205 

206def patch_filename(p: Optional[bytes], root: bytes) -> bytes: 

207 if p is None: 

208 return b"/dev/null" 

209 else: 

210 return root + b"/" + p 

211 

212 

213def write_object_diff( 

214 f: BinaryIO, 

215 store: "BaseObjectStore", 

216 old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

217 new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]], 

218 diff_binary: bool = False, 

219) -> None: 

220 """Write the diff for an object. 

221 

222 Args: 

223 f: File-like object to write to 

224 store: Store to retrieve objects from, if necessary 

225 old_file: (path, mode, hexsha) tuple 

226 new_file: (path, mode, hexsha) tuple 

227 diff_binary: Whether to diff files even if they 

228 are considered binary files by is_binary(). 

229 

230 Note: the tuple elements should be None for nonexistent files 

231 """ 

232 (old_path, old_mode, old_id) = old_file 

233 (new_path, new_mode, new_id) = new_file 

234 patched_old_path = patch_filename(old_path, b"a") 

235 patched_new_path = patch_filename(new_path, b"b") 

236 

237 def content(mode: Optional[int], hexsha: Optional[bytes]) -> Blob: 

238 from typing import cast 

239 

240 if hexsha is None: 

241 return cast(Blob, Blob.from_string(b"")) 

242 elif mode is not None and S_ISGITLINK(mode): 

243 return cast(Blob, Blob.from_string(b"Subproject commit " + hexsha + b"\n")) 

244 else: 

245 obj = store[hexsha] 

246 if isinstance(obj, Blob): 

247 return obj 

248 else: 

249 # Fallback for non-blob objects 

250 return cast(Blob, Blob.from_string(obj.as_raw_string())) 

251 

252 def lines(content: "Blob") -> list[bytes]: 

253 if not content: 

254 return [] 

255 else: 

256 return content.splitlines() 

257 

258 f.writelines( 

259 gen_diff_header((old_path, new_path), (old_mode, new_mode), (old_id, new_id)) 

260 ) 

261 old_content = content(old_mode, old_id) 

262 new_content = content(new_mode, new_id) 

263 if not diff_binary and (is_binary(old_content.data) or is_binary(new_content.data)): 

264 binary_diff = ( 

265 b"Binary files " 

266 + patched_old_path 

267 + b" and " 

268 + patched_new_path 

269 + b" differ\n" 

270 ) 

271 f.write(binary_diff) 

272 else: 

273 f.writelines( 

274 unified_diff( 

275 lines(old_content), 

276 lines(new_content), 

277 patched_old_path, 

278 patched_new_path, 

279 ) 

280 ) 

281 

282 

283# TODO(jelmer): Support writing unicode, rather than bytes. 

284def gen_diff_header( 

285 paths: tuple[Optional[bytes], Optional[bytes]], 

286 modes: tuple[Optional[int], Optional[int]], 

287 shas: tuple[Optional[bytes], Optional[bytes]], 

288) -> Generator[bytes, None, None]: 

289 """Write a blob diff header. 

290 

291 Args: 

292 paths: Tuple with old and new path 

293 modes: Tuple with old and new modes 

294 shas: Tuple with old and new shas 

295 """ 

296 (old_path, new_path) = paths 

297 (old_mode, new_mode) = modes 

298 (old_sha, new_sha) = shas 

299 if old_path is None and new_path is not None: 

300 old_path = new_path 

301 if new_path is None and old_path is not None: 

302 new_path = old_path 

303 old_path = patch_filename(old_path, b"a") 

304 new_path = patch_filename(new_path, b"b") 

305 yield b"diff --git " + old_path + b" " + new_path + b"\n" 

306 

307 if old_mode != new_mode: 

308 if new_mode is not None: 

309 if old_mode is not None: 

310 yield (f"old file mode {old_mode:o}\n").encode("ascii") 

311 yield (f"new file mode {new_mode:o}\n").encode("ascii") 

312 else: 

313 yield (f"deleted file mode {old_mode:o}\n").encode("ascii") 

314 yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) 

315 if new_mode is not None and old_mode is not None: 

316 yield (f" {new_mode:o}").encode("ascii") 

317 yield b"\n" 

318 

319 

320# TODO(jelmer): Support writing unicode, rather than bytes. 

321def write_blob_diff( 

322 f: BinaryIO, 

323 old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

324 new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]], 

325) -> None: 

326 """Write blob diff. 

327 

328 Args: 

329 f: File-like object to write to 

330 old_file: (path, mode, hexsha) tuple (None if nonexisting) 

331 new_file: (path, mode, hexsha) tuple (None if nonexisting) 

332 

333 Note: The use of write_object_diff is recommended over this function. 

334 """ 

335 (old_path, old_mode, old_blob) = old_file 

336 (new_path, new_mode, new_blob) = new_file 

337 patched_old_path = patch_filename(old_path, b"a") 

338 patched_new_path = patch_filename(new_path, b"b") 

339 

340 def lines(blob: Optional["Blob"]) -> list[bytes]: 

341 if blob is not None: 

342 return blob.splitlines() 

343 else: 

344 return [] 

345 

346 f.writelines( 

347 gen_diff_header( 

348 (old_path, new_path), 

349 (old_mode, new_mode), 

350 (getattr(old_blob, "id", None), getattr(new_blob, "id", None)), 

351 ) 

352 ) 

353 old_contents = lines(old_blob) 

354 new_contents = lines(new_blob) 

355 f.writelines( 

356 unified_diff(old_contents, new_contents, patched_old_path, patched_new_path) 

357 ) 

358 

359 

360def write_tree_diff( 

361 f: BinaryIO, 

362 store: "BaseObjectStore", 

363 old_tree: Optional[bytes], 

364 new_tree: Optional[bytes], 

365 diff_binary: bool = False, 

366) -> None: 

367 """Write tree diff. 

368 

369 Args: 

370 f: File-like object to write to. 

371 old_tree: Old tree id 

372 new_tree: New tree id 

373 diff_binary: Whether to diff files even if they 

374 are considered binary files by is_binary(). 

375 """ 

376 changes = store.tree_changes(old_tree, new_tree) 

377 for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: 

378 write_object_diff( 

379 f, 

380 store, 

381 (oldpath, oldmode, oldsha), 

382 (newpath, newmode, newsha), 

383 diff_binary=diff_binary, 

384 ) 

385 

386 

387def git_am_patch_split( 

388 f: Union[TextIO, BinaryIO], encoding: Optional[str] = None 

389) -> tuple["Commit", bytes, Optional[bytes]]: 

390 """Parse a git-am-style patch and split it up into bits. 

391 

392 Args: 

393 f: File-like object to parse 

394 encoding: Encoding to use when creating Git objects 

395 Returns: Tuple with commit object, diff contents and git version 

396 """ 

397 encoding = encoding or getattr(f, "encoding", "ascii") 

398 encoding = encoding or "ascii" 

399 contents = f.read() 

400 if isinstance(contents, bytes): 

401 bparser = email.parser.BytesParser() 

402 msg = bparser.parsebytes(contents) 

403 else: 

404 uparser = email.parser.Parser() 

405 msg = uparser.parsestr(contents) 

406 return parse_patch_message(msg, encoding) 

407 

408 

409def parse_patch_message( 

410 msg: "email.message.Message", encoding: Optional[str] = None 

411) -> tuple["Commit", bytes, Optional[bytes]]: 

412 """Extract a Commit object and patch from an e-mail message. 

413 

414 Args: 

415 msg: An email message (email.message.Message) 

416 encoding: Encoding to use to encode Git commits 

417 Returns: Tuple with commit object, diff contents and git version 

418 """ 

419 c = Commit() 

420 if encoding is None: 

421 encoding = "ascii" 

422 c.author = msg["from"].encode(encoding) 

423 c.committer = msg["from"].encode(encoding) 

424 try: 

425 patch_tag_start = msg["subject"].index("[PATCH") 

426 except ValueError: 

427 subject = msg["subject"] 

428 else: 

429 close = msg["subject"].index("] ", patch_tag_start) 

430 subject = msg["subject"][close + 2 :] 

431 c.message = (subject.replace("\n", "") + "\n").encode(encoding) 

432 first = True 

433 

434 body = msg.get_payload(decode=True) 

435 if isinstance(body, str): 

436 body = body.encode(encoding) 

437 if isinstance(body, bytes): 

438 lines = body.splitlines(True) 

439 else: 

440 # Handle other types by converting to string first 

441 lines = str(body).encode(encoding).splitlines(True) 

442 line_iter = iter(lines) 

443 

444 for line in line_iter: 

445 if line == b"---\n": 

446 break 

447 if first: 

448 if line.startswith(b"From: "): 

449 c.author = line[len(b"From: ") :].rstrip() 

450 else: 

451 c.message += b"\n" + line 

452 first = False 

453 else: 

454 c.message += line 

455 diff = b"" 

456 for line in line_iter: 

457 if line == b"-- \n": 

458 break 

459 diff += line 

460 try: 

461 version = next(line_iter).rstrip(b"\n") 

462 except StopIteration: 

463 version = None 

464 return c, diff, version