Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

960 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25import binascii 

26import os 

27import posixpath 

28import stat 

29import sys 

30import zlib 

31from collections import namedtuple 

32from collections.abc import Callable, Iterable, Iterator 

33from hashlib import sha1 

34from io import BufferedIOBase, BytesIO 

35from typing import ( 

36 IO, 

37 TYPE_CHECKING, 

38 Optional, 

39 Union, 

40) 

41 

42if sys.version_info >= (3, 11): 

43 from typing import Self 

44else: 

45 from typing_extensions import Self 

46 

47try: 

48 from typing import TypeGuard # type: ignore 

49except ImportError: 

50 from typing_extensions import TypeGuard 

51 

52from . import replace_me 

53from .errors import ( 

54 ChecksumMismatch, 

55 FileFormatException, 

56 NotBlobError, 

57 NotCommitError, 

58 NotTagError, 

59 NotTreeError, 

60 ObjectFormatException, 

61) 

62from .file import GitFile 

63 

64if TYPE_CHECKING: 

65 from _hashlib import HASH 

66 

67 from .file import _GitFile 

68 

69ZERO_SHA = b"0" * 40 

70 

71# Header fields for commits 

72_TREE_HEADER = b"tree" 

73_PARENT_HEADER = b"parent" 

74_AUTHOR_HEADER = b"author" 

75_COMMITTER_HEADER = b"committer" 

76_ENCODING_HEADER = b"encoding" 

77_MERGETAG_HEADER = b"mergetag" 

78_GPGSIG_HEADER = b"gpgsig" 

79 

80# Header fields for objects 

81_OBJECT_HEADER = b"object" 

82_TYPE_HEADER = b"type" 

83_TAG_HEADER = b"tag" 

84_TAGGER_HEADER = b"tagger" 

85 

86 

87S_IFGITLINK = 0o160000 

88 

89 

90MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

91 

92BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

93 

94 

95ObjectID = bytes 

96 

97 

98class EmptyFileException(FileFormatException): 

99 """An unexpectedly empty file was encountered.""" 

100 

101 

102def S_ISGITLINK(m: int) -> bool: 

103 """Check if a mode indicates a submodule. 

104 

105 Args: 

106 m: Mode to check 

107 Returns: a ``boolean`` 

108 """ 

109 return stat.S_IFMT(m) == S_IFGITLINK 

110 

111 

112def _decompress(string: bytes) -> bytes: 

113 dcomp = zlib.decompressobj() 

114 dcomped = dcomp.decompress(string) 

115 dcomped += dcomp.flush() 

116 return dcomped 

117 

118 

119def sha_to_hex(sha: ObjectID) -> bytes: 

120 """Takes a string and returns the hex of the sha within.""" 

121 hexsha = binascii.hexlify(sha) 

122 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

123 return hexsha 

124 

125 

126def hex_to_sha(hex: Union[bytes, str]) -> bytes: 

127 """Takes a hex sha and returns a binary sha.""" 

128 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}" 

129 try: 

130 return binascii.unhexlify(hex) 

131 except TypeError as exc: 

132 if not isinstance(hex, bytes): 

133 raise 

134 raise ValueError(exc.args[0]) from exc 

135 

136 

137def valid_hexsha(hex: Union[bytes, str]) -> bool: 

138 if len(hex) != 40: 

139 return False 

140 try: 

141 binascii.unhexlify(hex) 

142 except (TypeError, binascii.Error): 

143 return False 

144 else: 

145 return True 

146 

147 

148def hex_to_filename( 

149 path: Union[str, bytes], hex: Union[str, bytes] 

150) -> Union[str, bytes]: 

151 """Takes a hex sha and returns its filename relative to the given path.""" 

152 # os.path.join accepts bytes or unicode, but all args must be of the same 

153 # type. Make sure that hex which is expected to be bytes, is the same type 

154 # as path. 

155 if type(path) is not type(hex) and isinstance(path, str): 

156 hex = hex.decode("ascii") # type: ignore 

157 dir_name = hex[:2] 

158 file_name = hex[2:] 

159 # Check from object dir 

160 return os.path.join(path, dir_name, file_name) # type: ignore 

161 

162 

163def filename_to_hex(filename: Union[str, bytes]) -> str: 

164 """Takes an object filename and returns its corresponding hex sha.""" 

165 # grab the last (up to) two path components 

166 names = filename.rsplit(os.path.sep, 2)[-2:] # type: ignore 

167 errmsg = f"Invalid object filename: {filename!r}" 

168 assert len(names) == 2, errmsg 

169 base, rest = names 

170 assert len(base) == 2 and len(rest) == 38, errmsg 

171 hex_bytes = (base + rest).encode("ascii") # type: ignore 

172 hex_to_sha(hex_bytes) 

173 return hex_bytes.decode("ascii") 

174 

175 

176def object_header(num_type: int, length: int) -> bytes: 

177 """Return an object header for the given numeric type and text length.""" 

178 cls = object_class(num_type) 

179 if cls is None: 

180 raise AssertionError(f"unsupported class type num: {num_type}") 

181 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

182 

183 

184def serializable_property(name: str, docstring: Optional[str] = None) -> property: 

185 """A property that helps tracking whether serialization is necessary.""" 

186 

187 def set(obj: "ShaFile", value: object) -> None: 

188 setattr(obj, "_" + name, value) 

189 obj._needs_serialization = True 

190 

191 def get(obj: "ShaFile") -> object: 

192 return getattr(obj, "_" + name) 

193 

194 return property(get, set, doc=docstring) 

195 

196 

197def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]: 

198 """Get the object class corresponding to the given type. 

199 

200 Args: 

201 type: Either a type name string or a numeric type. 

202 Returns: The ShaFile subclass corresponding to the given type, or None if 

203 type is not a valid type name/number. 

204 """ 

205 return _TYPE_MAP.get(type, None) 

206 

207 

208def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None: 

209 """Check if a string is a valid hex sha string. 

210 

211 Args: 

212 hex: Hex string to check 

213 error_msg: Error message to use in exception 

214 Raises: 

215 ObjectFormatException: Raised when the string is not valid 

216 """ 

217 if not valid_hexsha(hex): 

218 raise ObjectFormatException(f"{error_msg} {hex!r}") 

219 

220 

221def check_identity(identity: Optional[bytes], error_msg: str) -> None: 

222 """Check if the specified identity is valid. 

223 

224 This will raise an exception if the identity is not valid. 

225 

226 Args: 

227 identity: Identity string 

228 error_msg: Error message to use in exception 

229 """ 

230 if identity is None: 

231 raise ObjectFormatException(error_msg) 

232 email_start = identity.find(b"<") 

233 email_end = identity.find(b">") 

234 if not all( 

235 [ 

236 email_start >= 1, 

237 identity[email_start - 1] == b" "[0], 

238 identity.find(b"<", email_start + 1) == -1, 

239 email_end == len(identity) - 1, 

240 b"\0" not in identity, 

241 b"\n" not in identity, 

242 ] 

243 ): 

244 raise ObjectFormatException(error_msg) 

245 

246 

247def check_time(time_seconds: int) -> None: 

248 """Check if the specified time is not prone to overflow error. 

249 

250 This will raise an exception if the time is not valid. 

251 

252 Args: 

253 time_seconds: time in seconds 

254 

255 """ 

256 # Prevent overflow error 

257 if time_seconds > MAX_TIME: 

258 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

259 

260 

261def git_line(*items: bytes) -> bytes: 

262 """Formats items into a space separated line.""" 

263 return b" ".join(items) + b"\n" 

264 

265 

266class FixedSha: 

267 """SHA object that behaves like hashlib's but is given a fixed value.""" 

268 

269 __slots__ = ("_hexsha", "_sha") 

270 

271 def __init__(self, hexsha: Union[str, bytes]) -> None: 

272 if isinstance(hexsha, str): 

273 hexsha = hexsha.encode("ascii") # type: ignore 

274 if not isinstance(hexsha, bytes): 

275 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

276 self._hexsha = hexsha 

277 self._sha = hex_to_sha(hexsha) 

278 

279 def digest(self) -> bytes: 

280 """Return the raw SHA digest.""" 

281 return self._sha 

282 

283 def hexdigest(self) -> str: 

284 """Return the hex SHA digest.""" 

285 return self._hexsha.decode("ascii") 

286 

287 

288# Type guard functions for runtime type narrowing 

289if TYPE_CHECKING: 

290 

291 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

292 """Check if a ShaFile is a Commit.""" 

293 return obj.type_name == b"commit" 

294 

295 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

296 """Check if a ShaFile is a Tree.""" 

297 return obj.type_name == b"tree" 

298 

299 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

300 """Check if a ShaFile is a Blob.""" 

301 return obj.type_name == b"blob" 

302 

303 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

304 """Check if a ShaFile is a Tag.""" 

305 return obj.type_name == b"tag" 

306else: 

307 # Runtime versions without type narrowing 

308 def is_commit(obj: "ShaFile") -> bool: 

309 """Check if a ShaFile is a Commit.""" 

310 return obj.type_name == b"commit" 

311 

312 def is_tree(obj: "ShaFile") -> bool: 

313 """Check if a ShaFile is a Tree.""" 

314 return obj.type_name == b"tree" 

315 

316 def is_blob(obj: "ShaFile") -> bool: 

317 """Check if a ShaFile is a Blob.""" 

318 return obj.type_name == b"blob" 

319 

320 def is_tag(obj: "ShaFile") -> bool: 

321 """Check if a ShaFile is a Tag.""" 

322 return obj.type_name == b"tag" 

323 

324 

325class ShaFile: 

326 """A git SHA file.""" 

327 

328 __slots__ = ("_chunked_text", "_needs_serialization", "_sha") 

329 

330 _needs_serialization: bool 

331 type_name: bytes 

332 type_num: int 

333 _chunked_text: Optional[list[bytes]] 

334 _sha: Union[FixedSha, None, "HASH"] 

335 

336 @staticmethod 

337 def _parse_legacy_object_header( 

338 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

339 ) -> "ShaFile": 

340 """Parse a legacy object, creating it but not reading the file.""" 

341 bufsize = 1024 

342 decomp = zlib.decompressobj() 

343 header = decomp.decompress(magic) 

344 start = 0 

345 end = -1 

346 while end < 0: 

347 extra = f.read(bufsize) 

348 header += decomp.decompress(extra) 

349 magic += extra 

350 end = header.find(b"\0", start) 

351 start = len(header) 

352 header = header[:end] 

353 type_name, size = header.split(b" ", 1) 

354 try: 

355 int(size) # sanity check 

356 except ValueError as exc: 

357 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

358 obj_class = object_class(type_name) 

359 if not obj_class: 

360 raise ObjectFormatException( 

361 "Not a known type: {}".format(type_name.decode("ascii")) 

362 ) 

363 return obj_class() 

364 

365 def _parse_legacy_object(self, map: bytes) -> None: 

366 """Parse a legacy object, setting the raw string.""" 

367 text = _decompress(map) 

368 header_end = text.find(b"\0") 

369 if header_end < 0: 

370 raise ObjectFormatException("Invalid object header, no \\0") 

371 self.set_raw_string(text[header_end + 1 :]) 

372 

373 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

374 """Return chunks representing the object in the experimental format. 

375 

376 Returns: List of strings 

377 """ 

378 compobj = zlib.compressobj(compression_level) 

379 yield compobj.compress(self._header()) 

380 for chunk in self.as_raw_chunks(): 

381 yield compobj.compress(chunk) 

382 yield compobj.flush() 

383 

384 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

385 """Return string representing the object in the experimental format.""" 

386 return b"".join( 

387 self.as_legacy_object_chunks(compression_level=compression_level) 

388 ) 

389 

390 def as_raw_chunks(self) -> list[bytes]: 

391 """Return chunks with serialization of the object. 

392 

393 Returns: List of strings, not necessarily one per line 

394 """ 

395 if self._needs_serialization: 

396 self._sha = None 

397 self._chunked_text = self._serialize() 

398 self._needs_serialization = False 

399 return self._chunked_text # type: ignore 

400 

401 def as_raw_string(self) -> bytes: 

402 """Return raw string with serialization of the object. 

403 

404 Returns: String object 

405 """ 

406 return b"".join(self.as_raw_chunks()) 

407 

408 def __bytes__(self) -> bytes: 

409 """Return raw string serialization of this object.""" 

410 return self.as_raw_string() 

411 

412 def __hash__(self) -> int: 

413 """Return unique hash for this object.""" 

414 return hash(self.id) 

415 

416 def as_pretty_string(self) -> str: 

417 """Return a string representing this object, fit for display.""" 

418 return self.as_raw_string().decode("utf-8", "replace") 

419 

420 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None: 

421 """Set the contents of this object from a serialized string.""" 

422 if not isinstance(text, bytes): 

423 raise TypeError(f"Expected bytes for text, got {text!r}") 

424 self.set_raw_chunks([text], sha) 

425 

426 def set_raw_chunks( 

427 self, chunks: list[bytes], sha: Optional[ObjectID] = None 

428 ) -> None: 

429 """Set the contents of this object from a list of chunks.""" 

430 self._chunked_text = chunks 

431 self._deserialize(chunks) 

432 if sha is None: 

433 self._sha = None 

434 else: 

435 self._sha = FixedSha(sha) # type: ignore 

436 self._needs_serialization = False 

437 

438 @staticmethod 

439 def _parse_object_header( 

440 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

441 ) -> "ShaFile": 

442 """Parse a new style object, creating it but not reading the file.""" 

443 num_type = (ord(magic[0:1]) >> 4) & 7 

444 obj_class = object_class(num_type) 

445 if not obj_class: 

446 raise ObjectFormatException(f"Not a known type {num_type}") 

447 return obj_class() 

448 

449 def _parse_object(self, map: bytes) -> None: 

450 """Parse a new style object, setting self._text.""" 

451 # skip type and size; type must have already been determined, and 

452 # we trust zlib to fail if it's otherwise corrupted 

453 byte = ord(map[0:1]) 

454 used = 1 

455 while (byte & 0x80) != 0: 

456 byte = ord(map[used : used + 1]) 

457 used += 1 

458 raw = map[used:] 

459 self.set_raw_string(_decompress(raw)) 

460 

461 @classmethod 

462 def _is_legacy_object(cls, magic: bytes) -> bool: 

463 b0 = ord(magic[0:1]) 

464 b1 = ord(magic[1:2]) 

465 word = (b0 << 8) + b1 

466 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

467 

468 @classmethod 

469 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

470 map = f.read() 

471 if not map: 

472 raise EmptyFileException("Corrupted empty file detected") 

473 

474 if cls._is_legacy_object(map): 

475 obj = cls._parse_legacy_object_header(map, f) 

476 obj._parse_legacy_object(map) 

477 else: 

478 obj = cls._parse_object_header(map, f) 

479 obj._parse_object(map) 

480 return obj 

481 

482 def __init__(self) -> None: 

483 """Don't call this directly.""" 

484 self._sha = None 

485 self._chunked_text = [] 

486 self._needs_serialization = True 

487 

488 def _deserialize(self, chunks: list[bytes]) -> None: 

489 raise NotImplementedError(self._deserialize) 

490 

491 def _serialize(self) -> list[bytes]: 

492 raise NotImplementedError(self._serialize) 

493 

494 @classmethod 

495 def from_path(cls, path: Union[str, bytes]) -> "ShaFile": 

496 """Open a SHA file from disk.""" 

497 with GitFile(path, "rb") as f: 

498 return cls.from_file(f) 

499 

500 @classmethod 

501 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

502 """Get the contents of a SHA file on disk.""" 

503 try: 

504 obj = cls._parse_file(f) 

505 obj._sha = None 

506 return obj 

507 except (IndexError, ValueError) as exc: 

508 raise ObjectFormatException("invalid object header") from exc 

509 

510 @staticmethod 

511 def from_raw_string( 

512 type_num: int, string: bytes, sha: Optional[ObjectID] = None 

513 ) -> "ShaFile": 

514 """Creates an object of the indicated type from the raw string given. 

515 

516 Args: 

517 type_num: The numeric type of the object. 

518 string: The raw uncompressed contents. 

519 sha: Optional known sha for the object 

520 """ 

521 cls = object_class(type_num) 

522 if cls is None: 

523 raise AssertionError(f"unsupported class type num: {type_num}") 

524 obj = cls() 

525 obj.set_raw_string(string, sha) 

526 return obj 

527 

528 @staticmethod 

529 def from_raw_chunks( 

530 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None 

531 ) -> "ShaFile": 

532 """Creates an object of the indicated type from the raw chunks given. 

533 

534 Args: 

535 type_num: The numeric type of the object. 

536 chunks: An iterable of the raw uncompressed contents. 

537 sha: Optional known sha for the object 

538 """ 

539 cls = object_class(type_num) 

540 if cls is None: 

541 raise AssertionError(f"unsupported class type num: {type_num}") 

542 obj = cls() 

543 obj.set_raw_chunks(chunks, sha) 

544 return obj 

545 

546 @classmethod 

547 def from_string(cls, string: bytes) -> Self: 

548 """Create a ShaFile from a string.""" 

549 obj = cls() 

550 obj.set_raw_string(string) 

551 return obj 

552 

553 def _check_has_member(self, member: str, error_msg: str) -> None: 

554 """Check that the object has a given member variable. 

555 

556 Args: 

557 member: the member variable to check for 

558 error_msg: the message for an error if the member is missing 

559 Raises: 

560 ObjectFormatException: with the given error_msg if member is 

561 missing or is None 

562 """ 

563 if getattr(self, member, None) is None: 

564 raise ObjectFormatException(error_msg) 

565 

566 def check(self) -> None: 

567 """Check this object for internal consistency. 

568 

569 Raises: 

570 ObjectFormatException: if the object is malformed in some way 

571 ChecksumMismatch: if the object was created with a SHA that does 

572 not match its contents 

573 """ 

574 # TODO: if we find that error-checking during object parsing is a 

575 # performance bottleneck, those checks should be moved to the class's 

576 # check() method during optimization so we can still check the object 

577 # when necessary. 

578 old_sha = self.id 

579 try: 

580 self._deserialize(self.as_raw_chunks()) 

581 self._sha = None 

582 new_sha = self.id 

583 except Exception as exc: 

584 raise ObjectFormatException(exc) from exc 

585 if old_sha != new_sha: 

586 raise ChecksumMismatch(new_sha, old_sha) 

587 

588 def _header(self) -> bytes: 

589 return object_header(self.type_num, self.raw_length()) 

590 

591 def raw_length(self) -> int: 

592 """Returns the length of the raw string of this object.""" 

593 return sum(map(len, self.as_raw_chunks())) 

594 

595 def sha(self) -> Union[FixedSha, "HASH"]: 

596 """The SHA1 object that is the name of this object.""" 

597 if self._sha is None or self._needs_serialization: 

598 # this is a local because as_raw_chunks() overwrites self._sha 

599 new_sha = sha1() 

600 new_sha.update(self._header()) 

601 for chunk in self.as_raw_chunks(): 

602 new_sha.update(chunk) 

603 self._sha = new_sha 

604 return self._sha 

605 

606 def copy(self) -> "ShaFile": 

607 """Create a new copy of this SHA1 object from its raw string.""" 

608 obj_class = object_class(self.type_num) 

609 if obj_class is None: 

610 raise AssertionError(f"invalid type num {self.type_num}") 

611 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

612 

613 @property 

614 def id(self) -> bytes: 

615 """The hex SHA of this object.""" 

616 return self.sha().hexdigest().encode("ascii") 

617 

618 def __repr__(self) -> str: 

619 return f"<{self.__class__.__name__} {self.id!r}>" 

620 

621 def __ne__(self, other: object) -> bool: 

622 """Check whether this object does not match the other.""" 

623 return not isinstance(other, ShaFile) or self.id != other.id 

624 

625 def __eq__(self, other: object) -> bool: 

626 """Return True if the SHAs of the two objects match.""" 

627 return isinstance(other, ShaFile) and self.id == other.id 

628 

629 def __lt__(self, other: object) -> bool: 

630 """Return whether SHA of this object is less than the other.""" 

631 if not isinstance(other, ShaFile): 

632 raise TypeError 

633 return self.id < other.id 

634 

635 def __le__(self, other: object) -> bool: 

636 """Check whether SHA of this object is less than or equal to the other.""" 

637 if not isinstance(other, ShaFile): 

638 raise TypeError 

639 return self.id <= other.id 

640 

641 

642class Blob(ShaFile): 

643 """A Git Blob object.""" 

644 

645 __slots__ = () 

646 

647 type_name = b"blob" 

648 type_num = 3 

649 

650 _chunked_text: list[bytes] 

651 

652 def __init__(self) -> None: 

653 super().__init__() 

654 self._chunked_text = [] 

655 self._needs_serialization = False 

656 

657 def _get_data(self) -> bytes: 

658 return self.as_raw_string() 

659 

660 def _set_data(self, data: bytes) -> None: 

661 self.set_raw_string(data) 

662 

663 data = property( 

664 _get_data, _set_data, doc="The text contained within the blob object." 

665 ) 

666 

667 def _get_chunked(self) -> list[bytes]: 

668 return self._chunked_text 

669 

670 def _set_chunked(self, chunks: list[bytes]) -> None: 

671 self._chunked_text = chunks 

672 

673 def _serialize(self) -> list[bytes]: 

674 return self._chunked_text 

675 

676 def _deserialize(self, chunks: list[bytes]) -> None: 

677 self._chunked_text = chunks 

678 

679 chunked = property( 

680 _get_chunked, 

681 _set_chunked, 

682 doc="The text in the blob object, as chunks (not necessarily lines)", 

683 ) 

684 

685 @classmethod 

686 def from_path(cls, path: Union[str, bytes]) -> "Blob": 

687 blob = ShaFile.from_path(path) 

688 if not isinstance(blob, cls): 

689 raise NotBlobError(path) 

690 return blob 

691 

692 def check(self) -> None: 

693 """Check this object for internal consistency. 

694 

695 Raises: 

696 ObjectFormatException: if the object is malformed in some way 

697 """ 

698 super().check() 

699 

700 def splitlines(self) -> list[bytes]: 

701 """Return list of lines in this blob. 

702 

703 This preserves the original line endings. 

704 """ 

705 chunks = self.chunked 

706 if not chunks: 

707 return [] 

708 if len(chunks) == 1: 

709 return chunks[0].splitlines(True) 

710 remaining = None 

711 ret = [] 

712 for chunk in chunks: 

713 lines = chunk.splitlines(True) 

714 if len(lines) > 1: 

715 ret.append((remaining or b"") + lines[0]) 

716 ret.extend(lines[1:-1]) 

717 remaining = lines[-1] 

718 elif len(lines) == 1: 

719 if remaining is None: 

720 remaining = lines.pop() 

721 else: 

722 remaining += lines.pop() 

723 if remaining is not None: 

724 ret.append(remaining) 

725 return ret 

726 

727 

728def _parse_message( 

729 chunks: Iterable[bytes], 

730) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]: 

731 """Parse a message with a list of fields and a body. 

732 

733 Args: 

734 chunks: the raw chunks of the tag or commit object. 

735 Returns: iterator of tuples of (field, value), one per header line, in the 

736 order read from the text, possibly including duplicates. Includes a 

737 field named None for the freeform tag/commit text. 

738 """ 

739 f = BytesIO(b"".join(chunks)) 

740 k = None 

741 v = b"" 

742 eof = False 

743 

744 def _strip_last_newline(value: bytes) -> bytes: 

745 """Strip the last newline from value.""" 

746 if value and value.endswith(b"\n"): 

747 return value[:-1] 

748 return value 

749 

750 # Parse the headers 

751 # 

752 # Headers can contain newlines. The next line is indented with a space. 

753 # We store the latest key as 'k', and the accumulated value as 'v'. 

754 for line in f: 

755 if line.startswith(b" "): 

756 # Indented continuation of the previous line 

757 v += line[1:] 

758 else: 

759 if k is not None: 

760 # We parsed a new header, return its value 

761 yield (k, _strip_last_newline(v)) 

762 if line == b"\n": 

763 # Empty line indicates end of headers 

764 break 

765 (k, v) = line.split(b" ", 1) 

766 

767 else: 

768 # We reached end of file before the headers ended. We still need to 

769 # return the previous header, then we need to return a None field for 

770 # the text. 

771 eof = True 

772 if k is not None: 

773 yield (k, _strip_last_newline(v)) 

774 yield (None, None) 

775 

776 if not eof: 

777 # We didn't reach the end of file while parsing headers. We can return 

778 # the rest of the file as a message. 

779 yield (None, f.read()) 

780 

781 f.close() 

782 

783 

784def _format_message( 

785 headers: list[tuple[bytes, bytes]], body: Optional[bytes] 

786) -> Iterator[bytes]: 

787 for field, value in headers: 

788 lines = value.split(b"\n") 

789 yield git_line(field, lines[0]) 

790 for line in lines[1:]: 

791 yield b" " + line + b"\n" 

792 yield b"\n" # There must be a new line after the headers 

793 if body: 

794 yield body 

795 

796 

797class Tag(ShaFile): 

798 """A Git Tag object.""" 

799 

800 type_name = b"tag" 

801 type_num = 4 

802 

803 __slots__ = ( 

804 "_message", 

805 "_name", 

806 "_object_class", 

807 "_object_sha", 

808 "_signature", 

809 "_tag_time", 

810 "_tag_timezone", 

811 "_tag_timezone_neg_utc", 

812 "_tagger", 

813 ) 

814 

815 _message: Optional[bytes] 

816 _name: Optional[bytes] 

817 _object_class: Optional[type["ShaFile"]] 

818 _object_sha: Optional[bytes] 

819 _signature: Optional[bytes] 

820 _tag_time: Optional[int] 

821 _tag_timezone: Optional[int] 

822 _tag_timezone_neg_utc: Optional[bool] 

823 _tagger: Optional[bytes] 

824 

825 def __init__(self) -> None: 

826 super().__init__() 

827 self._tagger = None 

828 self._tag_time = None 

829 self._tag_timezone = None 

830 self._tag_timezone_neg_utc = False 

831 self._signature: Optional[bytes] = None 

832 

833 @classmethod 

834 def from_path(cls, filename: Union[str, bytes]) -> "Tag": 

835 tag = ShaFile.from_path(filename) 

836 if not isinstance(tag, cls): 

837 raise NotTagError(filename) 

838 return tag 

839 

840 def check(self) -> None: 

841 """Check this object for internal consistency. 

842 

843 Raises: 

844 ObjectFormatException: if the object is malformed in some way 

845 """ 

846 super().check() 

847 assert self._chunked_text is not None 

848 self._check_has_member("_object_sha", "missing object sha") 

849 self._check_has_member("_object_class", "missing object type") 

850 self._check_has_member("_name", "missing tag name") 

851 

852 if not self._name: 

853 raise ObjectFormatException("empty tag name") 

854 

855 if self._object_sha is None: 

856 raise ObjectFormatException("missing object sha") 

857 check_hexsha(self._object_sha, "invalid object sha") 

858 

859 if self._tagger is not None: 

860 check_identity(self._tagger, "invalid tagger") 

861 

862 self._check_has_member("_tag_time", "missing tag time") 

863 if self._tag_time is None: 

864 raise ObjectFormatException("missing tag time") 

865 check_time(self._tag_time) 

866 

867 last = None 

868 for field, _ in _parse_message(self._chunked_text): 

869 if field == _OBJECT_HEADER and last is not None: 

870 raise ObjectFormatException("unexpected object") 

871 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

872 raise ObjectFormatException("unexpected type") 

873 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

874 raise ObjectFormatException("unexpected tag name") 

875 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

876 raise ObjectFormatException("unexpected tagger") 

877 last = field 

878 

879 def _serialize(self) -> list[bytes]: 

880 headers = [] 

881 if self._object_sha is None: 

882 raise ObjectFormatException("missing object sha") 

883 headers.append((_OBJECT_HEADER, self._object_sha)) 

884 if self._object_class is None: 

885 raise ObjectFormatException("missing object class") 

886 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

887 if self._name is None: 

888 raise ObjectFormatException("missing tag name") 

889 headers.append((_TAG_HEADER, self._name)) 

890 if self._tagger: 

891 if self._tag_time is None: 

892 headers.append((_TAGGER_HEADER, self._tagger)) 

893 else: 

894 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

895 raise ObjectFormatException("missing timezone info") 

896 headers.append( 

897 ( 

898 _TAGGER_HEADER, 

899 format_time_entry( 

900 self._tagger, 

901 self._tag_time, 

902 (self._tag_timezone, self._tag_timezone_neg_utc), 

903 ), 

904 ) 

905 ) 

906 

907 if self.message is None and self._signature is None: 

908 body = None 

909 else: 

910 body = (self.message or b"") + (self._signature or b"") 

911 return list(_format_message(headers, body)) 

912 

913 def _deserialize(self, chunks: list[bytes]) -> None: 

914 """Grab the metadata attached to the tag.""" 

915 self._tagger = None 

916 self._tag_time = None 

917 self._tag_timezone = None 

918 self._tag_timezone_neg_utc = False 

919 for field, value in _parse_message(chunks): 

920 if field == _OBJECT_HEADER: 

921 self._object_sha = value 

922 elif field == _TYPE_HEADER: 

923 assert isinstance(value, bytes) 

924 obj_class = object_class(value) 

925 if not obj_class: 

926 raise ObjectFormatException(f"Not a known type: {value!r}") 

927 self._object_class = obj_class 

928 elif field == _TAG_HEADER: 

929 self._name = value 

930 elif field == _TAGGER_HEADER: 

931 if value is None: 

932 raise ObjectFormatException("missing tagger value") 

933 ( 

934 self._tagger, 

935 self._tag_time, 

936 (self._tag_timezone, self._tag_timezone_neg_utc), 

937 ) = parse_time_entry(value) 

938 elif field is None: 

939 if value is None: 

940 self._message = None 

941 self._signature = None 

942 else: 

943 try: 

944 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

945 except ValueError: 

946 self._message = value 

947 self._signature = None 

948 else: 

949 self._message = value[:sig_idx] 

950 self._signature = value[sig_idx:] 

951 else: 

952 raise ObjectFormatException( 

953 f"Unknown field {field.decode('ascii', 'replace')}" 

954 ) 

955 

956 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

957 """Get the object pointed to by this tag. 

958 

959 Returns: tuple of (object class, sha). 

960 """ 

961 if self._object_class is None or self._object_sha is None: 

962 raise ValueError("Tag object is not properly initialized") 

963 return (self._object_class, self._object_sha) 

964 

965 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

966 (self._object_class, self._object_sha) = value 

967 self._needs_serialization = True 

968 

969 object = property(_get_object, _set_object) 

970 

971 name = serializable_property("name", "The name of this tag") 

972 tagger = serializable_property( 

973 "tagger", "Returns the name of the person who created this tag" 

974 ) 

975 tag_time = serializable_property( 

976 "tag_time", 

977 "The creation timestamp of the tag. As the number of seconds since the epoch", 

978 ) 

979 tag_timezone = serializable_property( 

980 "tag_timezone", "The timezone that tag_time is in." 

981 ) 

982 message = serializable_property("message", "the message attached to this tag") 

983 

984 signature = serializable_property("signature", "Optional detached GPG signature") 

985 

986 def sign(self, keyid: Optional[str] = None) -> None: 

987 import gpg 

988 

989 with gpg.Context(armor=True) as c: 

990 if keyid is not None: 

991 key = c.get_key(keyid) 

992 with gpg.Context(armor=True, signers=[key]) as ctx: 

993 self.signature, unused_result = ctx.sign( 

994 self.as_raw_string(), 

995 mode=gpg.constants.sig.mode.DETACH, 

996 ) 

997 else: 

998 self.signature, unused_result = c.sign( 

999 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1000 ) 

1001 

1002 def raw_without_sig(self) -> bytes: 

1003 """Return raw string serialization without the GPG/SSH signature. 

1004 

1005 self.signature is a signature for the returned raw byte string serialization. 

1006 """ 

1007 ret = self.as_raw_string() 

1008 if self._signature: 

1009 ret = ret[: -len(self._signature)] 

1010 return ret 

1011 

1012 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1013 """Verify GPG signature for this tag (if it is signed). 

1014 

1015 Args: 

1016 keyids: Optional iterable of trusted keyids for this tag. 

1017 If this tag is not signed by any key in keyids verification will 

1018 fail. If not specified, this function only verifies that the tag 

1019 has a valid signature. 

1020 

1021 Raises: 

1022 gpg.errors.BadSignatures: if GPG signature verification fails 

1023 gpg.errors.MissingSignatures: if tag was not signed by a key 

1024 specified in keyids 

1025 """ 

1026 if self._signature is None: 

1027 return 

1028 

1029 import gpg 

1030 

1031 with gpg.Context() as ctx: 

1032 data, result = ctx.verify( 

1033 self.raw_without_sig(), 

1034 signature=self._signature, 

1035 ) 

1036 if keyids: 

1037 keys = [ctx.get_key(key) for key in keyids] 

1038 for key in keys: 

1039 for subkey in keys: 

1040 for sig in result.signatures: 

1041 if subkey.can_sign and subkey.fpr == sig.fpr: 

1042 return 

1043 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1044 

1045 

1046class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])): 

1047 """Named tuple encapsulating a single tree entry.""" 

1048 

1049 def in_path(self, path: bytes) -> "TreeEntry": 

1050 """Return a copy of this entry with the given path prepended.""" 

1051 if not isinstance(self.path, bytes): 

1052 raise TypeError(f"Expected bytes for path, got {path!r}") 

1053 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1054 

1055 

1056def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]: 

1057 """Parse a tree text. 

1058 

1059 Args: 

1060 text: Serialized text to parse 

1061 Returns: iterator of tuples of (name, mode, sha) 

1062 

1063 Raises: 

1064 ObjectFormatException: if the object was malformed in some way 

1065 """ 

1066 count = 0 

1067 length = len(text) 

1068 while count < length: 

1069 mode_end = text.index(b" ", count) 

1070 mode_text = text[count:mode_end] 

1071 if strict and mode_text.startswith(b"0"): 

1072 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1073 try: 

1074 mode = int(mode_text, 8) 

1075 except ValueError as exc: 

1076 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1077 name_end = text.index(b"\0", mode_end) 

1078 name = text[mode_end + 1 : name_end] 

1079 count = name_end + 21 

1080 sha = text[name_end + 1 : count] 

1081 if len(sha) != 20: 

1082 raise ObjectFormatException("Sha has invalid length") 

1083 hexsha = sha_to_hex(sha) 

1084 yield (name, mode, hexsha) 

1085 

1086 

1087def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]: 

1088 """Serialize the items in a tree to a text. 

1089 

1090 Args: 

1091 items: Sorted iterable over (name, mode, sha) tuples 

1092 Returns: Serialized tree text as chunks 

1093 """ 

1094 for name, mode, hexsha in items: 

1095 yield ( 

1096 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1097 ) 

1098 

1099 

1100def sorted_tree_items( 

1101 entries: dict[bytes, tuple[int, bytes]], name_order: bool 

1102) -> Iterator[TreeEntry]: 

1103 """Iterate over a tree entries dictionary. 

1104 

1105 Args: 

1106 name_order: If True, iterate entries in order of their name. If 

1107 False, iterate entries in tree order, that is, treat subtree entries as 

1108 having '/' appended. 

1109 entries: Dictionary mapping names to (mode, sha) tuples 

1110 Returns: Iterator over (name, mode, hexsha) 

1111 """ 

1112 if name_order: 

1113 key_func = key_entry_name_order 

1114 else: 

1115 key_func = key_entry 

1116 for name, entry in sorted(entries.items(), key=key_func): 

1117 mode, hexsha = entry 

1118 # Stricter type checks than normal to mirror checks in the Rust version. 

1119 mode = int(mode) 

1120 if not isinstance(hexsha, bytes): 

1121 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1122 yield TreeEntry(name, mode, hexsha) 

1123 

1124 

1125def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1126 """Sort key for tree entry. 

1127 

1128 Args: 

1129 entry: (name, value) tuple 

1130 """ 

1131 (name, (mode, _sha)) = entry 

1132 if stat.S_ISDIR(mode): 

1133 name += b"/" 

1134 return name 

1135 

1136 

1137def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1138 """Sort key for tree entry in name order.""" 

1139 return entry[0] 

1140 

1141 

1142def pretty_format_tree_entry( 

1143 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8" 

1144) -> str: 

1145 """Pretty format tree entry. 

1146 

1147 Args: 

1148 name: Name of the directory entry 

1149 mode: Mode of entry 

1150 hexsha: Hexsha of the referenced object 

1151 Returns: string describing the tree entry 

1152 """ 

1153 if mode & stat.S_IFDIR: 

1154 kind = "tree" 

1155 else: 

1156 kind = "blob" 

1157 return "{:04o} {} {}\t{}\n".format( 

1158 mode, 

1159 kind, 

1160 hexsha.decode("ascii"), 

1161 name.decode(encoding, "replace"), 

1162 ) 

1163 

1164 

1165class SubmoduleEncountered(Exception): 

1166 """A submodule was encountered while resolving a path.""" 

1167 

1168 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1169 self.path = path 

1170 self.sha = sha 

1171 

1172 

1173class Tree(ShaFile): 

1174 """A Git tree object.""" 

1175 

1176 type_name = b"tree" 

1177 type_num = 2 

1178 

1179 __slots__ = "_entries" 

1180 

1181 def __init__(self) -> None: 

1182 super().__init__() 

1183 self._entries: dict[bytes, tuple[int, bytes]] = {} 

1184 

1185 @classmethod 

1186 def from_path(cls, filename: Union[str, bytes]) -> "Tree": 

1187 tree = ShaFile.from_path(filename) 

1188 if not isinstance(tree, cls): 

1189 raise NotTreeError(filename) 

1190 return tree 

1191 

1192 def __contains__(self, name: bytes) -> bool: 

1193 return name in self._entries 

1194 

1195 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1196 return self._entries[name] 

1197 

1198 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1199 """Set a tree entry by name. 

1200 

1201 Args: 

1202 name: The name of the entry, as a string. 

1203 value: A tuple of (mode, hexsha), where mode is the mode of the 

1204 entry as an integral type and hexsha is the hex SHA of the entry as 

1205 a string. 

1206 """ 

1207 mode, hexsha = value 

1208 self._entries[name] = (mode, hexsha) 

1209 self._needs_serialization = True 

1210 

1211 def __delitem__(self, name: bytes) -> None: 

1212 del self._entries[name] 

1213 self._needs_serialization = True 

1214 

1215 def __len__(self) -> int: 

1216 return len(self._entries) 

1217 

1218 def __iter__(self) -> Iterator[bytes]: 

1219 return iter(self._entries) 

1220 

1221 def add(self, name: bytes, mode: int, hexsha: bytes) -> None: 

1222 """Add an entry to the tree. 

1223 

1224 Args: 

1225 mode: The mode of the entry as an integral type. Not all 

1226 possible modes are supported by git; see check() for details. 

1227 name: The name of the entry, as a string. 

1228 hexsha: The hex SHA of the entry as a string. 

1229 """ 

1230 self._entries[name] = mode, hexsha 

1231 self._needs_serialization = True 

1232 

1233 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1234 """Iterate over entries. 

1235 

1236 Args: 

1237 name_order: If True, iterate in name order instead of tree 

1238 order. 

1239 Returns: Iterator over (name, mode, sha) tuples 

1240 """ 

1241 return sorted_tree_items(self._entries, name_order) 

1242 

1243 def items(self) -> list[TreeEntry]: 

1244 """Return the sorted entries in this tree. 

1245 

1246 Returns: List with (name, mode, sha) tuples 

1247 """ 

1248 return list(self.iteritems()) 

1249 

1250 def _deserialize(self, chunks: list[bytes]) -> None: 

1251 """Grab the entries in the tree.""" 

1252 try: 

1253 parsed_entries = parse_tree(b"".join(chunks)) 

1254 except ValueError as exc: 

1255 raise ObjectFormatException(exc) from exc 

1256 # TODO: list comprehension is for efficiency in the common (small) 

1257 # case; if memory efficiency in the large case is a concern, use a 

1258 # genexp. 

1259 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1260 

1261 def check(self) -> None: 

1262 """Check this object for internal consistency. 

1263 

1264 Raises: 

1265 ObjectFormatException: if the object is malformed in some way 

1266 """ 

1267 super().check() 

1268 assert self._chunked_text is not None 

1269 last = None 

1270 allowed_modes = ( 

1271 stat.S_IFREG | 0o755, 

1272 stat.S_IFREG | 0o644, 

1273 stat.S_IFLNK, 

1274 stat.S_IFDIR, 

1275 S_IFGITLINK, 

1276 # TODO: optionally exclude as in git fsck --strict 

1277 stat.S_IFREG | 0o664, 

1278 ) 

1279 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1280 check_hexsha(sha, f"invalid sha {sha!r}") 

1281 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1282 raise ObjectFormatException( 

1283 "invalid name {}".format(name.decode("utf-8", "replace")) 

1284 ) 

1285 

1286 if mode not in allowed_modes: 

1287 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1288 

1289 entry = (name, (mode, sha)) 

1290 if last: 

1291 if key_entry(last) > key_entry(entry): 

1292 raise ObjectFormatException("entries not sorted") 

1293 if name == last[0]: 

1294 raise ObjectFormatException(f"duplicate entry {name!r}") 

1295 last = entry 

1296 

1297 def _serialize(self) -> list[bytes]: 

1298 return list(serialize_tree(self.iteritems())) 

1299 

1300 def as_pretty_string(self) -> str: 

1301 text: list[str] = [] 

1302 for name, mode, hexsha in self.iteritems(): 

1303 text.append(pretty_format_tree_entry(name, mode, hexsha)) 

1304 return "".join(text) 

1305 

1306 def lookup_path( 

1307 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1308 ) -> tuple[int, ObjectID]: 

1309 """Look up an object in a Git tree. 

1310 

1311 Args: 

1312 lookup_obj: Callback for retrieving object by SHA1 

1313 path: Path to lookup 

1314 Returns: A tuple of (mode, SHA) of the resulting path. 

1315 """ 

1316 parts = path.split(b"/") 

1317 sha = self.id 

1318 mode: Optional[int] = None 

1319 for i, p in enumerate(parts): 

1320 if not p: 

1321 continue 

1322 if mode is not None and S_ISGITLINK(mode): 

1323 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1324 obj = lookup_obj(sha) 

1325 if not isinstance(obj, Tree): 

1326 raise NotTreeError(sha) 

1327 mode, sha = obj[p] 

1328 if mode is None: 

1329 raise ValueError("No valid path found") 

1330 return mode, sha 

1331 

1332 

1333def parse_timezone(text: bytes) -> tuple[int, bool]: 

1334 """Parse a timezone text fragment (e.g. '+0100'). 

1335 

1336 Args: 

1337 text: Text to parse. 

1338 Returns: Tuple with timezone as seconds difference to UTC 

1339 and a boolean indicating whether this was a UTC timezone 

1340 prefixed with a negative sign (-0000). 

1341 """ 

1342 # cgit parses the first character as the sign, and the rest 

1343 # as an integer (using strtol), which could also be negative. 

1344 # We do the same for compatibility. See #697828. 

1345 if text[0] not in b"+-": 

1346 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1347 sign = text[:1] 

1348 offset = int(text[1:]) 

1349 if sign == b"-": 

1350 offset = -offset 

1351 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1352 signum = ((offset < 0) and -1) or 1 

1353 offset = abs(offset) 

1354 hours = int(offset / 100) 

1355 minutes = offset % 100 

1356 return ( 

1357 signum * (hours * 3600 + minutes * 60), 

1358 unnecessary_negative_timezone, 

1359 ) 

1360 

1361 

1362def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1363 """Format a timezone for Git serialization. 

1364 

1365 Args: 

1366 offset: Timezone offset as seconds difference to UTC 

1367 unnecessary_negative_timezone: Whether to use a minus sign for 

1368 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1369 """ 

1370 if offset % 60 != 0: 

1371 raise ValueError("Unable to handle non-minute offset.") 

1372 if offset < 0 or unnecessary_negative_timezone: 

1373 sign = "-" 

1374 offset = -offset 

1375 else: 

1376 sign = "+" 

1377 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1378 

1379 

1380def parse_time_entry( 

1381 value: bytes, 

1382) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]: 

1383 """Parse event. 

1384 

1385 Args: 

1386 value: Bytes representing a git commit/tag line 

1387 Raises: 

1388 ObjectFormatException in case of parsing error (malformed 

1389 field date) 

1390 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1391 """ 

1392 try: 

1393 sep = value.rindex(b"> ") 

1394 except ValueError: 

1395 return (value, None, (None, False)) 

1396 try: 

1397 person = value[0 : sep + 1] 

1398 rest = value[sep + 2 :] 

1399 timetext, timezonetext = rest.rsplit(b" ", 1) 

1400 time = int(timetext) 

1401 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1402 except ValueError as exc: 

1403 raise ObjectFormatException(exc) from exc 

1404 return person, time, (timezone, timezone_neg_utc) 

1405 

1406 

1407def format_time_entry( 

1408 person: bytes, time: int, timezone_info: tuple[int, bool] 

1409) -> bytes: 

1410 """Format an event.""" 

1411 (timezone, timezone_neg_utc) = timezone_info 

1412 return b" ".join( 

1413 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1414 ) 

1415 

1416 

1417@replace_me(since="0.21.0", remove_in="0.24.0") 

1418def parse_commit( 

1419 chunks: Iterable[bytes], 

1420) -> tuple[ 

1421 Optional[bytes], 

1422 list[bytes], 

1423 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1424 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1425 Optional[bytes], 

1426 list[Tag], 

1427 Optional[bytes], 

1428 Optional[bytes], 

1429 list[tuple[bytes, bytes]], 

1430]: 

1431 """Parse a commit object from chunks. 

1432 

1433 Args: 

1434 chunks: Chunks to parse 

1435 Returns: Tuple of (tree, parents, author_info, commit_info, 

1436 encoding, mergetag, gpgsig, message, extra) 

1437 """ 

1438 parents = [] 

1439 extra = [] 

1440 tree = None 

1441 author_info: tuple[ 

1442 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1443 ] = (None, None, (None, None)) 

1444 commit_info: tuple[ 

1445 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1446 ] = (None, None, (None, None)) 

1447 encoding = None 

1448 mergetag = [] 

1449 message = None 

1450 gpgsig = None 

1451 

1452 for field, value in _parse_message(chunks): 

1453 # TODO(jelmer): Enforce ordering 

1454 if field == _TREE_HEADER: 

1455 tree = value 

1456 elif field == _PARENT_HEADER: 

1457 if value is None: 

1458 raise ObjectFormatException("missing parent value") 

1459 parents.append(value) 

1460 elif field == _AUTHOR_HEADER: 

1461 if value is None: 

1462 raise ObjectFormatException("missing author value") 

1463 author_info = parse_time_entry(value) 

1464 elif field == _COMMITTER_HEADER: 

1465 if value is None: 

1466 raise ObjectFormatException("missing committer value") 

1467 commit_info = parse_time_entry(value) 

1468 elif field == _ENCODING_HEADER: 

1469 encoding = value 

1470 elif field == _MERGETAG_HEADER: 

1471 if value is None: 

1472 raise ObjectFormatException("missing mergetag value") 

1473 tag = Tag.from_string(value + b"\n") 

1474 assert isinstance(tag, Tag) 

1475 mergetag.append(tag) 

1476 elif field == _GPGSIG_HEADER: 

1477 gpgsig = value 

1478 elif field is None: 

1479 message = value 

1480 else: 

1481 if value is None: 

1482 raise ObjectFormatException(f"missing value for field {field!r}") 

1483 extra.append((field, value)) 

1484 return ( 

1485 tree, 

1486 parents, 

1487 author_info, 

1488 commit_info, 

1489 encoding, 

1490 mergetag, 

1491 gpgsig, 

1492 message, 

1493 extra, 

1494 ) 

1495 

1496 

1497class Commit(ShaFile): 

1498 """A git commit object.""" 

1499 

1500 type_name = b"commit" 

1501 type_num = 1 

1502 

1503 __slots__ = ( 

1504 "_author", 

1505 "_author_time", 

1506 "_author_timezone", 

1507 "_author_timezone_neg_utc", 

1508 "_commit_time", 

1509 "_commit_timezone", 

1510 "_commit_timezone_neg_utc", 

1511 "_committer", 

1512 "_encoding", 

1513 "_extra", 

1514 "_gpgsig", 

1515 "_mergetag", 

1516 "_message", 

1517 "_parents", 

1518 "_tree", 

1519 ) 

1520 

1521 def __init__(self) -> None: 

1522 super().__init__() 

1523 self._parents: list[bytes] = [] 

1524 self._encoding: Optional[bytes] = None 

1525 self._mergetag: list[Tag] = [] 

1526 self._gpgsig: Optional[bytes] = None 

1527 self._extra: list[tuple[bytes, Optional[bytes]]] = [] 

1528 self._author_timezone_neg_utc: Optional[bool] = False 

1529 self._commit_timezone_neg_utc: Optional[bool] = False 

1530 

1531 @classmethod 

1532 def from_path(cls, path: Union[str, bytes]) -> "Commit": 

1533 commit = ShaFile.from_path(path) 

1534 if not isinstance(commit, cls): 

1535 raise NotCommitError(path) 

1536 return commit 

1537 

1538 def _deserialize(self, chunks: list[bytes]) -> None: 

1539 self._parents = [] 

1540 self._extra = [] 

1541 self._tree = None 

1542 author_info: tuple[ 

1543 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1544 ] = (None, None, (None, None)) 

1545 commit_info: tuple[ 

1546 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1547 ] = (None, None, (None, None)) 

1548 self._encoding = None 

1549 self._mergetag = [] 

1550 self._message = None 

1551 self._gpgsig = None 

1552 

1553 for field, value in _parse_message(chunks): 

1554 # TODO(jelmer): Enforce ordering 

1555 if field == _TREE_HEADER: 

1556 self._tree = value 

1557 elif field == _PARENT_HEADER: 

1558 assert value is not None 

1559 self._parents.append(value) 

1560 elif field == _AUTHOR_HEADER: 

1561 if value is None: 

1562 raise ObjectFormatException("missing author value") 

1563 author_info = parse_time_entry(value) 

1564 elif field == _COMMITTER_HEADER: 

1565 if value is None: 

1566 raise ObjectFormatException("missing committer value") 

1567 commit_info = parse_time_entry(value) 

1568 elif field == _ENCODING_HEADER: 

1569 self._encoding = value 

1570 elif field == _MERGETAG_HEADER: 

1571 assert value is not None 

1572 tag = Tag.from_string(value + b"\n") 

1573 assert isinstance(tag, Tag) 

1574 self._mergetag.append(tag) 

1575 elif field == _GPGSIG_HEADER: 

1576 self._gpgsig = value 

1577 elif field is None: 

1578 self._message = value 

1579 else: 

1580 self._extra.append((field, value)) 

1581 

1582 ( 

1583 self._author, 

1584 self._author_time, 

1585 (self._author_timezone, self._author_timezone_neg_utc), 

1586 ) = author_info 

1587 ( 

1588 self._committer, 

1589 self._commit_time, 

1590 (self._commit_timezone, self._commit_timezone_neg_utc), 

1591 ) = commit_info 

1592 

1593 def check(self) -> None: 

1594 """Check this object for internal consistency. 

1595 

1596 Raises: 

1597 ObjectFormatException: if the object is malformed in some way 

1598 """ 

1599 super().check() 

1600 assert self._chunked_text is not None 

1601 self._check_has_member("_tree", "missing tree") 

1602 self._check_has_member("_author", "missing author") 

1603 self._check_has_member("_committer", "missing committer") 

1604 self._check_has_member("_author_time", "missing author time") 

1605 self._check_has_member("_commit_time", "missing commit time") 

1606 

1607 for parent in self._parents: 

1608 check_hexsha(parent, "invalid parent sha") 

1609 assert self._tree is not None # checked by _check_has_member above 

1610 check_hexsha(self._tree, "invalid tree sha") 

1611 

1612 assert self._author is not None # checked by _check_has_member above 

1613 assert self._committer is not None # checked by _check_has_member above 

1614 check_identity(self._author, "invalid author") 

1615 check_identity(self._committer, "invalid committer") 

1616 

1617 assert self._author_time is not None # checked by _check_has_member above 

1618 assert self._commit_time is not None # checked by _check_has_member above 

1619 check_time(self._author_time) 

1620 check_time(self._commit_time) 

1621 

1622 last = None 

1623 for field, _ in _parse_message(self._chunked_text): 

1624 if field == _TREE_HEADER and last is not None: 

1625 raise ObjectFormatException("unexpected tree") 

1626 elif field == _PARENT_HEADER and last not in ( 

1627 _PARENT_HEADER, 

1628 _TREE_HEADER, 

1629 ): 

1630 raise ObjectFormatException("unexpected parent") 

1631 elif field == _AUTHOR_HEADER and last not in ( 

1632 _TREE_HEADER, 

1633 _PARENT_HEADER, 

1634 ): 

1635 raise ObjectFormatException("unexpected author") 

1636 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1637 raise ObjectFormatException("unexpected committer") 

1638 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1639 raise ObjectFormatException("unexpected encoding") 

1640 last = field 

1641 

1642 # TODO: optionally check for duplicate parents 

1643 

1644 def sign(self, keyid: Optional[str] = None) -> None: 

1645 import gpg 

1646 

1647 with gpg.Context(armor=True) as c: 

1648 if keyid is not None: 

1649 key = c.get_key(keyid) 

1650 with gpg.Context(armor=True, signers=[key]) as ctx: 

1651 self.gpgsig, unused_result = ctx.sign( 

1652 self.as_raw_string(), 

1653 mode=gpg.constants.sig.mode.DETACH, 

1654 ) 

1655 else: 

1656 self.gpgsig, unused_result = c.sign( 

1657 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1658 ) 

1659 

1660 def raw_without_sig(self) -> bytes: 

1661 """Return raw string serialization without the GPG/SSH signature. 

1662 

1663 self.gpgsig is a signature for the returned raw byte string serialization. 

1664 """ 

1665 tmp = self.copy() 

1666 assert isinstance(tmp, Commit) 

1667 tmp._gpgsig = None 

1668 tmp.gpgsig = None 

1669 return tmp.as_raw_string() 

1670 

1671 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1672 """Verify GPG signature for this commit (if it is signed). 

1673 

1674 Args: 

1675 keyids: Optional iterable of trusted keyids for this commit. 

1676 If this commit is not signed by any key in keyids verification will 

1677 fail. If not specified, this function only verifies that the commit 

1678 has a valid signature. 

1679 

1680 Raises: 

1681 gpg.errors.BadSignatures: if GPG signature verification fails 

1682 gpg.errors.MissingSignatures: if commit was not signed by a key 

1683 specified in keyids 

1684 """ 

1685 if self._gpgsig is None: 

1686 return 

1687 

1688 import gpg 

1689 

1690 with gpg.Context() as ctx: 

1691 data, result = ctx.verify( 

1692 self.raw_without_sig(), 

1693 signature=self._gpgsig, 

1694 ) 

1695 if keyids: 

1696 keys = [ctx.get_key(key) for key in keyids] 

1697 for key in keys: 

1698 for subkey in keys: 

1699 for sig in result.signatures: 

1700 if subkey.can_sign and subkey.fpr == sig.fpr: 

1701 return 

1702 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1703 

1704 def _serialize(self) -> list[bytes]: 

1705 headers = [] 

1706 assert self._tree is not None 

1707 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1708 headers.append((_TREE_HEADER, tree_bytes)) 

1709 for p in self._parents: 

1710 headers.append((_PARENT_HEADER, p)) 

1711 assert self._author is not None 

1712 assert self._author_time is not None 

1713 assert self._author_timezone is not None 

1714 assert self._author_timezone_neg_utc is not None 

1715 headers.append( 

1716 ( 

1717 _AUTHOR_HEADER, 

1718 format_time_entry( 

1719 self._author, 

1720 self._author_time, 

1721 (self._author_timezone, self._author_timezone_neg_utc), 

1722 ), 

1723 ) 

1724 ) 

1725 assert self._committer is not None 

1726 assert self._commit_time is not None 

1727 assert self._commit_timezone is not None 

1728 assert self._commit_timezone_neg_utc is not None 

1729 headers.append( 

1730 ( 

1731 _COMMITTER_HEADER, 

1732 format_time_entry( 

1733 self._committer, 

1734 self._commit_time, 

1735 (self._commit_timezone, self._commit_timezone_neg_utc), 

1736 ), 

1737 ) 

1738 ) 

1739 if self.encoding: 

1740 headers.append((_ENCODING_HEADER, self.encoding)) 

1741 for mergetag in self.mergetag: 

1742 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

1743 headers.extend( 

1744 (field, value) for field, value in self._extra if value is not None 

1745 ) 

1746 if self.gpgsig: 

1747 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

1748 return list(_format_message(headers, self._message)) 

1749 

1750 tree = serializable_property("tree", "Tree that is the state of this commit") 

1751 

1752 def _get_parents(self) -> list[bytes]: 

1753 """Return a list of parents of this commit.""" 

1754 return self._parents 

1755 

1756 def _set_parents(self, value: list[bytes]) -> None: 

1757 """Set a list of parents of this commit.""" 

1758 self._needs_serialization = True 

1759 self._parents = value 

1760 

1761 parents = property( 

1762 _get_parents, 

1763 _set_parents, 

1764 doc="Parents of this commit, by their SHA1.", 

1765 ) 

1766 

1767 @replace_me(since="0.21.0", remove_in="0.24.0") 

1768 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]: 

1769 """Return extra settings of this commit.""" 

1770 return self._extra 

1771 

1772 extra = property( 

1773 _get_extra, 

1774 doc="Extra header fields not understood (presumably added in a " 

1775 "newer version of git). Kept verbatim so the object can " 

1776 "be correctly reserialized. For private commit metadata, use " 

1777 "pseudo-headers in Commit.message, rather than this field.", 

1778 ) 

1779 

1780 author = serializable_property("author", "The name of the author of the commit") 

1781 

1782 committer = serializable_property( 

1783 "committer", "The name of the committer of the commit" 

1784 ) 

1785 

1786 message = serializable_property("message", "The commit message") 

1787 

1788 commit_time = serializable_property( 

1789 "commit_time", 

1790 "The timestamp of the commit. As the number of seconds since the epoch.", 

1791 ) 

1792 

1793 commit_timezone = serializable_property( 

1794 "commit_timezone", "The zone the commit time is in" 

1795 ) 

1796 

1797 author_time = serializable_property( 

1798 "author_time", 

1799 "The timestamp the commit was written. As the number of " 

1800 "seconds since the epoch.", 

1801 ) 

1802 

1803 author_timezone = serializable_property( 

1804 "author_timezone", "Returns the zone the author time is in." 

1805 ) 

1806 

1807 encoding = serializable_property("encoding", "Encoding of the commit message.") 

1808 

1809 mergetag = serializable_property("mergetag", "Associated signed tag.") 

1810 

1811 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

1812 

1813 

1814OBJECT_CLASSES = ( 

1815 Commit, 

1816 Tree, 

1817 Blob, 

1818 Tag, 

1819) 

1820 

1821_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {} 

1822 

1823for cls in OBJECT_CLASSES: 

1824 _TYPE_MAP[cls.type_name] = cls 

1825 _TYPE_MAP[cls.type_num] = cls 

1826 

1827 

1828# Hold on to the pure-python implementations for testing 

1829_parse_tree_py = parse_tree 

1830_sorted_tree_items_py = sorted_tree_items 

1831try: 

1832 # Try to import Rust versions 

1833 from dulwich._objects import ( 

1834 parse_tree as _parse_tree_rs, 

1835 ) 

1836 from dulwich._objects import ( 

1837 sorted_tree_items as _sorted_tree_items_rs, 

1838 ) 

1839except ImportError: 

1840 pass 

1841else: 

1842 parse_tree = _parse_tree_rs 

1843 sorted_tree_items = _sorted_tree_items_rs