Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1020 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25__all__ = [ 

26 "BEGIN_PGP_SIGNATURE", 

27 "BEGIN_SSH_SIGNATURE", 

28 "MAX_TIME", 

29 "OBJECT_CLASSES", 

30 "SIGNATURE_PGP", 

31 "SIGNATURE_SSH", 

32 "S_IFGITLINK", 

33 "S_ISGITLINK", 

34 "ZERO_SHA", 

35 "Blob", 

36 "Commit", 

37 "EmptyFileException", 

38 "FixedSha", 

39 "ObjectID", 

40 "RawObjectID", 

41 "ShaFile", 

42 "SubmoduleEncountered", 

43 "Tag", 

44 "Tree", 

45 "TreeEntry", 

46 "check_hexsha", 

47 "check_identity", 

48 "check_time", 

49 "filename_to_hex", 

50 "format_time_entry", 

51 "format_timezone", 

52 "git_line", 

53 "hex_to_filename", 

54 "hex_to_sha", 

55 "is_blob", 

56 "is_commit", 

57 "is_tag", 

58 "is_tree", 

59 "key_entry", 

60 "key_entry_name_order", 

61 "object_class", 

62 "object_header", 

63 "parse_commit", 

64 "parse_time_entry", 

65 "parse_timezone", 

66 "parse_tree", 

67 "pretty_format_tree_entry", 

68 "serializable_property", 

69 "serialize_tree", 

70 "sha_to_hex", 

71 "sorted_tree_items", 

72 "valid_hexsha", 

73] 

74 

75import binascii 

76import os 

77import posixpath 

78import stat 

79import sys 

80import zlib 

81from collections.abc import Callable, Iterable, Iterator, Sequence 

82from hashlib import sha1 

83from io import BufferedIOBase, BytesIO 

84from typing import ( 

85 IO, 

86 TYPE_CHECKING, 

87 NamedTuple, 

88 TypeVar, 

89) 

90 

91if sys.version_info >= (3, 11): 

92 from typing import Self 

93else: 

94 from typing_extensions import Self 

95 

96from typing import NewType, TypeGuard 

97 

98from . import replace_me 

99from .errors import ( 

100 ChecksumMismatch, 

101 FileFormatException, 

102 NotBlobError, 

103 NotCommitError, 

104 NotTagError, 

105 NotTreeError, 

106 ObjectFormatException, 

107) 

108from .file import GitFile 

109 

110if TYPE_CHECKING: 

111 from _hashlib import HASH 

112 

113 from .file import _GitFile 

114 

115# Header fields for commits 

116_TREE_HEADER = b"tree" 

117_PARENT_HEADER = b"parent" 

118_AUTHOR_HEADER = b"author" 

119_COMMITTER_HEADER = b"committer" 

120_ENCODING_HEADER = b"encoding" 

121_MERGETAG_HEADER = b"mergetag" 

122_GPGSIG_HEADER = b"gpgsig" 

123 

124# Header fields for objects 

125_OBJECT_HEADER = b"object" 

126_TYPE_HEADER = b"type" 

127_TAG_HEADER = b"tag" 

128_TAGGER_HEADER = b"tagger" 

129 

130 

131S_IFGITLINK = 0o160000 

132 

133 

134MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

135 

136BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

137BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----" 

138 

139# Signature type constants 

140SIGNATURE_PGP = b"pgp" 

141SIGNATURE_SSH = b"ssh" 

142 

143 

144# Hex SHA type 

145ObjectID = NewType("ObjectID", bytes) 

146 

147# Raw SHA type 

148RawObjectID = NewType("RawObjectID", bytes) 

149 

150# Zero SHA constant 

151ZERO_SHA: ObjectID = ObjectID(b"0" * 40) 

152 

153 

154class EmptyFileException(FileFormatException): 

155 """An unexpectedly empty file was encountered.""" 

156 

157 

158def S_ISGITLINK(m: int) -> bool: 

159 """Check if a mode indicates a submodule. 

160 

161 Args: 

162 m: Mode to check 

163 Returns: a ``boolean`` 

164 """ 

165 return stat.S_IFMT(m) == S_IFGITLINK 

166 

167 

168def _decompress(string: bytes) -> bytes: 

169 dcomp = zlib.decompressobj() 

170 dcomped = dcomp.decompress(string) 

171 dcomped += dcomp.flush() 

172 return dcomped 

173 

174 

175def sha_to_hex(sha: RawObjectID) -> ObjectID: 

176 """Takes a string and returns the hex of the sha within.""" 

177 hexsha = binascii.hexlify(sha) 

178 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

179 return ObjectID(hexsha) 

180 

181 

182def hex_to_sha(hex: ObjectID | str) -> RawObjectID: 

183 """Takes a hex sha and returns a binary sha.""" 

184 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}" 

185 try: 

186 return RawObjectID(binascii.unhexlify(hex)) 

187 except TypeError as exc: 

188 if not isinstance(hex, bytes): 

189 raise 

190 raise ValueError(exc.args[0]) from exc 

191 

192 

193def valid_hexsha(hex: bytes | str) -> bool: 

194 """Check if a string is a valid hex SHA. 

195 

196 Args: 

197 hex: Hex string to check 

198 

199 Returns: 

200 True if valid hex SHA, False otherwise 

201 """ 

202 if len(hex) != 40: 

203 return False 

204 try: 

205 binascii.unhexlify(hex) 

206 except (TypeError, binascii.Error): 

207 return False 

208 else: 

209 return True 

210 

211 

212PathT = TypeVar("PathT", str, bytes) 

213 

214 

215def hex_to_filename(path: PathT, hex: str | bytes) -> PathT: 

216 """Takes a hex sha and returns its filename relative to the given path.""" 

217 # os.path.join accepts bytes or unicode, but all args must be of the same 

218 # type. Make sure that hex which is expected to be bytes, is the same type 

219 # as path. 

220 if isinstance(path, str): 

221 if isinstance(hex, bytes): 

222 hex_str = hex.decode("ascii") 

223 else: 

224 hex_str = hex 

225 dir_name = hex_str[:2] 

226 file_name = hex_str[2:] 

227 result = os.path.join(path, dir_name, file_name) 

228 assert isinstance(result, str) 

229 return result 

230 else: 

231 # path is bytes 

232 if isinstance(hex, str): 

233 hex_bytes = hex.encode("ascii") 

234 else: 

235 hex_bytes = hex 

236 dir_name_b = hex_bytes[:2] 

237 file_name_b = hex_bytes[2:] 

238 result_b = os.path.join(path, dir_name_b, file_name_b) 

239 assert isinstance(result_b, bytes) 

240 return result_b 

241 

242 

243def filename_to_hex(filename: str | bytes) -> str: 

244 """Takes an object filename and returns its corresponding hex sha.""" 

245 # grab the last (up to) two path components 

246 errmsg = f"Invalid object filename: {filename!r}" 

247 if isinstance(filename, str): 

248 names = filename.rsplit(os.path.sep, 2)[-2:] 

249 assert len(names) == 2, errmsg 

250 base, rest = names 

251 assert len(base) == 2 and len(rest) == 38, errmsg 

252 hex_str = base + rest 

253 hex_bytes = hex_str.encode("ascii") 

254 else: 

255 # filename is bytes 

256 sep = ( 

257 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep 

258 ) 

259 names_b = filename.rsplit(sep, 2)[-2:] 

260 assert len(names_b) == 2, errmsg 

261 base_b, rest_b = names_b 

262 assert len(base_b) == 2 and len(rest_b) == 38, errmsg 

263 hex_bytes = base_b + rest_b 

264 hex_to_sha(ObjectID(hex_bytes)) 

265 return hex_bytes.decode("ascii") 

266 

267 

268def object_header(num_type: int, length: int) -> bytes: 

269 """Return an object header for the given numeric type and text length.""" 

270 cls = object_class(num_type) 

271 if cls is None: 

272 raise AssertionError(f"unsupported class type num: {num_type}") 

273 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

274 

275 

276def serializable_property(name: str, docstring: str | None = None) -> property: 

277 """A property that helps tracking whether serialization is necessary.""" 

278 

279 def set(obj: "ShaFile", value: object) -> None: 

280 """Set the property value and mark the object as needing serialization. 

281 

282 Args: 

283 obj: The ShaFile object 

284 value: The value to set 

285 """ 

286 setattr(obj, "_" + name, value) 

287 obj._needs_serialization = True 

288 

289 def get(obj: "ShaFile") -> object: 

290 """Get the property value. 

291 

292 Args: 

293 obj: The ShaFile object 

294 

295 Returns: 

296 The property value 

297 """ 

298 return getattr(obj, "_" + name) 

299 

300 return property(get, set, doc=docstring) 

301 

302 

303def object_class(type: bytes | int) -> type["ShaFile"] | None: 

304 """Get the object class corresponding to the given type. 

305 

306 Args: 

307 type: Either a type name string or a numeric type. 

308 Returns: The ShaFile subclass corresponding to the given type, or None if 

309 type is not a valid type name/number. 

310 """ 

311 return _TYPE_MAP.get(type, None) 

312 

313 

314def check_hexsha(hex: str | bytes, error_msg: str) -> None: 

315 """Check if a string is a valid hex sha string. 

316 

317 Args: 

318 hex: Hex string to check 

319 error_msg: Error message to use in exception 

320 Raises: 

321 ObjectFormatException: Raised when the string is not valid 

322 """ 

323 if not valid_hexsha(hex): 

324 raise ObjectFormatException(f"{error_msg} {hex!r}") 

325 

326 

327def check_identity(identity: bytes | None, error_msg: str) -> None: 

328 """Check if the specified identity is valid. 

329 

330 This will raise an exception if the identity is not valid. 

331 

332 Args: 

333 identity: Identity string 

334 error_msg: Error message to use in exception 

335 """ 

336 if identity is None: 

337 raise ObjectFormatException(error_msg) 

338 email_start = identity.find(b"<") 

339 email_end = identity.find(b">") 

340 if not all( 

341 [ 

342 email_start >= 1, 

343 identity[email_start - 1] == b" "[0], 

344 identity.find(b"<", email_start + 1) == -1, 

345 email_end == len(identity) - 1, 

346 b"\0" not in identity, 

347 b"\n" not in identity, 

348 ] 

349 ): 

350 raise ObjectFormatException(error_msg) 

351 

352 

353def _path_to_bytes(path: str | bytes) -> bytes: 

354 """Convert a path to bytes for use in error messages.""" 

355 if isinstance(path, str): 

356 return path.encode("utf-8", "surrogateescape") 

357 return path 

358 

359 

360def check_time(time_seconds: int) -> None: 

361 """Check if the specified time is not prone to overflow error. 

362 

363 This will raise an exception if the time is not valid. 

364 

365 Args: 

366 time_seconds: time in seconds 

367 

368 """ 

369 # Prevent overflow error 

370 if time_seconds > MAX_TIME: 

371 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

372 

373 

374def git_line(*items: bytes) -> bytes: 

375 """Formats items into a space separated line.""" 

376 return b" ".join(items) + b"\n" 

377 

378 

379class FixedSha: 

380 """SHA object that behaves like hashlib's but is given a fixed value.""" 

381 

382 __slots__ = ("_hexsha", "_sha") 

383 

384 def __init__(self, hexsha: str | bytes) -> None: 

385 """Initialize FixedSha with a fixed SHA value. 

386 

387 Args: 

388 hexsha: Hex SHA value as string or bytes 

389 """ 

390 if isinstance(hexsha, str): 

391 hexsha = hexsha.encode("ascii") 

392 if not isinstance(hexsha, bytes): 

393 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

394 self._hexsha = hexsha 

395 self._sha = hex_to_sha(ObjectID(hexsha)) 

396 

397 def digest(self) -> bytes: 

398 """Return the raw SHA digest.""" 

399 return self._sha 

400 

401 def hexdigest(self) -> str: 

402 """Return the hex SHA digest.""" 

403 return self._hexsha.decode("ascii") 

404 

405 

406# Type guard functions for runtime type narrowing 

407if TYPE_CHECKING: 

408 

409 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

410 """Check if a ShaFile is a Commit.""" 

411 return obj.type_name == b"commit" 

412 

413 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

414 """Check if a ShaFile is a Tree.""" 

415 return obj.type_name == b"tree" 

416 

417 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

418 """Check if a ShaFile is a Blob.""" 

419 return obj.type_name == b"blob" 

420 

421 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

422 """Check if a ShaFile is a Tag.""" 

423 return obj.type_name == b"tag" 

424else: 

425 # Runtime versions without type narrowing 

426 def is_commit(obj: "ShaFile") -> bool: 

427 """Check if a ShaFile is a Commit.""" 

428 return obj.type_name == b"commit" 

429 

430 def is_tree(obj: "ShaFile") -> bool: 

431 """Check if a ShaFile is a Tree.""" 

432 return obj.type_name == b"tree" 

433 

434 def is_blob(obj: "ShaFile") -> bool: 

435 """Check if a ShaFile is a Blob.""" 

436 return obj.type_name == b"blob" 

437 

438 def is_tag(obj: "ShaFile") -> bool: 

439 """Check if a ShaFile is a Tag.""" 

440 return obj.type_name == b"tag" 

441 

442 

443class ShaFile: 

444 """A git SHA file.""" 

445 

446 __slots__ = ("_chunked_text", "_needs_serialization", "_sha") 

447 

448 _needs_serialization: bool 

449 type_name: bytes 

450 type_num: int 

451 _chunked_text: list[bytes] | None 

452 _sha: "FixedSha | None | HASH" 

453 

454 @staticmethod 

455 def _parse_legacy_object_header( 

456 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile" 

457 ) -> "ShaFile": 

458 """Parse a legacy object, creating it but not reading the file.""" 

459 bufsize = 1024 

460 decomp = zlib.decompressobj() 

461 header = decomp.decompress(magic) 

462 start = 0 

463 end = -1 

464 while end < 0: 

465 extra = f.read(bufsize) 

466 header += decomp.decompress(extra) 

467 magic += extra 

468 end = header.find(b"\0", start) 

469 start = len(header) 

470 header = header[:end] 

471 type_name, size = header.split(b" ", 1) 

472 try: 

473 int(size) # sanity check 

474 except ValueError as exc: 

475 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

476 obj_class = object_class(type_name) 

477 if not obj_class: 

478 raise ObjectFormatException( 

479 "Not a known type: {}".format(type_name.decode("ascii")) 

480 ) 

481 return obj_class() 

482 

483 def _parse_legacy_object(self, map: bytes) -> None: 

484 """Parse a legacy object, setting the raw string.""" 

485 text = _decompress(map) 

486 header_end = text.find(b"\0") 

487 if header_end < 0: 

488 raise ObjectFormatException("Invalid object header, no \\0") 

489 self.set_raw_string(text[header_end + 1 :]) 

490 

491 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

492 """Return chunks representing the object in the experimental format. 

493 

494 Returns: List of strings 

495 """ 

496 compobj = zlib.compressobj(compression_level) 

497 yield compobj.compress(self._header()) 

498 for chunk in self.as_raw_chunks(): 

499 yield compobj.compress(chunk) 

500 yield compobj.flush() 

501 

502 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

503 """Return string representing the object in the experimental format.""" 

504 return b"".join( 

505 self.as_legacy_object_chunks(compression_level=compression_level) 

506 ) 

507 

508 def as_raw_chunks(self) -> list[bytes]: 

509 """Return chunks with serialization of the object. 

510 

511 Returns: List of strings, not necessarily one per line 

512 """ 

513 if self._needs_serialization: 

514 self._sha = None 

515 self._chunked_text = self._serialize() 

516 self._needs_serialization = False 

517 assert self._chunked_text is not None 

518 return self._chunked_text 

519 

520 def as_raw_string(self) -> bytes: 

521 """Return raw string with serialization of the object. 

522 

523 Returns: String object 

524 """ 

525 return b"".join(self.as_raw_chunks()) 

526 

527 def __bytes__(self) -> bytes: 

528 """Return raw string serialization of this object.""" 

529 return self.as_raw_string() 

530 

531 def __hash__(self) -> int: 

532 """Return unique hash for this object.""" 

533 return hash(self.id) 

534 

535 def as_pretty_string(self) -> str: 

536 """Return a string representing this object, fit for display.""" 

537 return self.as_raw_string().decode("utf-8", "replace") 

538 

539 def set_raw_string( 

540 self, text: bytes, sha: ObjectID | RawObjectID | None = None 

541 ) -> None: 

542 """Set the contents of this object from a serialized string.""" 

543 if not isinstance(text, bytes): 

544 raise TypeError(f"Expected bytes for text, got {text!r}") 

545 self.set_raw_chunks([text], sha) 

546 

547 def set_raw_chunks( 

548 self, chunks: list[bytes], sha: ObjectID | RawObjectID | None = None 

549 ) -> None: 

550 """Set the contents of this object from a list of chunks.""" 

551 self._chunked_text = chunks 

552 self._deserialize(chunks) 

553 if sha is None: 

554 self._sha = None 

555 else: 

556 self._sha = FixedSha(sha) 

557 self._needs_serialization = False 

558 

559 @staticmethod 

560 def _parse_object_header( 

561 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile" 

562 ) -> "ShaFile": 

563 """Parse a new style object, creating it but not reading the file.""" 

564 num_type = (ord(magic[0:1]) >> 4) & 7 

565 obj_class = object_class(num_type) 

566 if not obj_class: 

567 raise ObjectFormatException(f"Not a known type {num_type}") 

568 return obj_class() 

569 

570 def _parse_object(self, map: bytes) -> None: 

571 """Parse a new style object, setting self._text.""" 

572 # skip type and size; type must have already been determined, and 

573 # we trust zlib to fail if it's otherwise corrupted 

574 byte = ord(map[0:1]) 

575 used = 1 

576 while (byte & 0x80) != 0: 

577 byte = ord(map[used : used + 1]) 

578 used += 1 

579 raw = map[used:] 

580 self.set_raw_string(_decompress(raw)) 

581 

582 @classmethod 

583 def _is_legacy_object(cls, magic: bytes) -> bool: 

584 b0 = ord(magic[0:1]) 

585 b1 = ord(magic[1:2]) 

586 word = (b0 << 8) + b1 

587 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

588 

589 @classmethod 

590 def _parse_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile": 

591 map = f.read() 

592 if not map: 

593 raise EmptyFileException("Corrupted empty file detected") 

594 

595 if cls._is_legacy_object(map): 

596 obj = cls._parse_legacy_object_header(map, f) 

597 obj._parse_legacy_object(map) 

598 else: 

599 obj = cls._parse_object_header(map, f) 

600 obj._parse_object(map) 

601 return obj 

602 

603 def __init__(self) -> None: 

604 """Don't call this directly.""" 

605 self._sha = None 

606 self._chunked_text = [] 

607 self._needs_serialization = True 

608 

609 def _deserialize(self, chunks: list[bytes]) -> None: 

610 raise NotImplementedError(self._deserialize) 

611 

612 def _serialize(self) -> list[bytes]: 

613 raise NotImplementedError(self._serialize) 

614 

615 @classmethod 

616 def from_path(cls, path: str | bytes) -> "ShaFile": 

617 """Open a SHA file from disk.""" 

618 with GitFile(path, "rb") as f: 

619 return cls.from_file(f) 

620 

621 @classmethod 

622 def from_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile": 

623 """Get the contents of a SHA file on disk.""" 

624 try: 

625 obj = cls._parse_file(f) 

626 obj._sha = None 

627 return obj 

628 except (IndexError, ValueError) as exc: 

629 raise ObjectFormatException("invalid object header") from exc 

630 

631 @staticmethod 

632 def from_raw_string( 

633 type_num: int, string: bytes, sha: ObjectID | RawObjectID | None = None 

634 ) -> "ShaFile": 

635 """Creates an object of the indicated type from the raw string given. 

636 

637 Args: 

638 type_num: The numeric type of the object. 

639 string: The raw uncompressed contents. 

640 sha: Optional known sha for the object 

641 """ 

642 cls = object_class(type_num) 

643 if cls is None: 

644 raise AssertionError(f"unsupported class type num: {type_num}") 

645 obj = cls() 

646 obj.set_raw_string(string, sha) 

647 return obj 

648 

649 @staticmethod 

650 def from_raw_chunks( 

651 type_num: int, chunks: list[bytes], sha: ObjectID | RawObjectID | None = None 

652 ) -> "ShaFile": 

653 """Creates an object of the indicated type from the raw chunks given. 

654 

655 Args: 

656 type_num: The numeric type of the object. 

657 chunks: An iterable of the raw uncompressed contents. 

658 sha: Optional known sha for the object 

659 """ 

660 cls = object_class(type_num) 

661 if cls is None: 

662 raise AssertionError(f"unsupported class type num: {type_num}") 

663 obj = cls() 

664 obj.set_raw_chunks(chunks, sha) 

665 return obj 

666 

667 @classmethod 

668 def from_string(cls, string: bytes) -> Self: 

669 """Create a ShaFile from a string.""" 

670 obj = cls() 

671 obj.set_raw_string(string) 

672 return obj 

673 

674 def _check_has_member(self, member: str, error_msg: str) -> None: 

675 """Check that the object has a given member variable. 

676 

677 Args: 

678 member: the member variable to check for 

679 error_msg: the message for an error if the member is missing 

680 Raises: 

681 ObjectFormatException: with the given error_msg if member is 

682 missing or is None 

683 """ 

684 if getattr(self, member, None) is None: 

685 raise ObjectFormatException(error_msg) 

686 

687 def check(self) -> None: 

688 """Check this object for internal consistency. 

689 

690 Raises: 

691 ObjectFormatException: if the object is malformed in some way 

692 ChecksumMismatch: if the object was created with a SHA that does 

693 not match its contents 

694 """ 

695 # TODO: if we find that error-checking during object parsing is a 

696 # performance bottleneck, those checks should be moved to the class's 

697 # check() method during optimization so we can still check the object 

698 # when necessary. 

699 old_sha = self.id 

700 try: 

701 self._deserialize(self.as_raw_chunks()) 

702 self._sha = None 

703 new_sha = self.id 

704 except Exception as exc: 

705 raise ObjectFormatException(exc) from exc 

706 if old_sha != new_sha: 

707 raise ChecksumMismatch(new_sha, old_sha) 

708 

709 def _header(self) -> bytes: 

710 return object_header(self.type_num, self.raw_length()) 

711 

712 def raw_length(self) -> int: 

713 """Returns the length of the raw string of this object.""" 

714 return sum(map(len, self.as_raw_chunks())) 

715 

716 def sha(self) -> "FixedSha | HASH": 

717 """The SHA1 object that is the name of this object.""" 

718 if self._sha is None or self._needs_serialization: 

719 # this is a local because as_raw_chunks() overwrites self._sha 

720 new_sha = sha1() 

721 new_sha.update(self._header()) 

722 for chunk in self.as_raw_chunks(): 

723 new_sha.update(chunk) 

724 self._sha = new_sha 

725 return self._sha 

726 

727 def copy(self) -> "ShaFile": 

728 """Create a new copy of this SHA1 object from its raw string.""" 

729 obj_class = object_class(self.type_num) 

730 if obj_class is None: 

731 raise AssertionError(f"invalid type num {self.type_num}") 

732 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

733 

734 @property 

735 def id(self) -> ObjectID: 

736 """The hex SHA of this object.""" 

737 return ObjectID(self.sha().hexdigest().encode("ascii")) 

738 

739 def __repr__(self) -> str: 

740 """Return string representation of this object.""" 

741 return f"<{self.__class__.__name__} {self.id!r}>" 

742 

743 def __ne__(self, other: object) -> bool: 

744 """Check whether this object does not match the other.""" 

745 return not isinstance(other, ShaFile) or self.id != other.id 

746 

747 def __eq__(self, other: object) -> bool: 

748 """Return True if the SHAs of the two objects match.""" 

749 return isinstance(other, ShaFile) and self.id == other.id 

750 

751 def __lt__(self, other: object) -> bool: 

752 """Return whether SHA of this object is less than the other.""" 

753 if not isinstance(other, ShaFile): 

754 raise TypeError 

755 return self.id < other.id 

756 

757 def __le__(self, other: object) -> bool: 

758 """Check whether SHA of this object is less than or equal to the other.""" 

759 if not isinstance(other, ShaFile): 

760 raise TypeError 

761 return self.id <= other.id 

762 

763 

764class Blob(ShaFile): 

765 """A Git Blob object.""" 

766 

767 __slots__ = () 

768 

769 type_name = b"blob" 

770 type_num = 3 

771 

772 _chunked_text: list[bytes] 

773 

774 def __init__(self) -> None: 

775 """Initialize a new Blob object.""" 

776 super().__init__() 

777 self._chunked_text = [] 

778 self._needs_serialization = False 

779 

780 def _get_data(self) -> bytes: 

781 return self.as_raw_string() 

782 

783 def _set_data(self, data: bytes) -> None: 

784 self.set_raw_string(data) 

785 

786 data = property( 

787 _get_data, _set_data, doc="The text contained within the blob object." 

788 ) 

789 

790 def _get_chunked(self) -> list[bytes]: 

791 return self._chunked_text 

792 

793 def _set_chunked(self, chunks: list[bytes]) -> None: 

794 self._chunked_text = chunks 

795 

796 def _serialize(self) -> list[bytes]: 

797 return self._chunked_text 

798 

799 def _deserialize(self, chunks: list[bytes]) -> None: 

800 self._chunked_text = chunks 

801 

802 chunked = property( 

803 _get_chunked, 

804 _set_chunked, 

805 doc="The text in the blob object, as chunks (not necessarily lines)", 

806 ) 

807 

808 @classmethod 

809 def from_path(cls, path: str | bytes) -> "Blob": 

810 """Read a blob from a file on disk. 

811 

812 Args: 

813 path: Path to the blob file 

814 

815 Returns: 

816 A Blob object 

817 

818 Raises: 

819 NotBlobError: If the file is not a blob 

820 """ 

821 blob = ShaFile.from_path(path) 

822 if not isinstance(blob, cls): 

823 raise NotBlobError(_path_to_bytes(path)) 

824 return blob 

825 

826 def check(self) -> None: 

827 """Check this object for internal consistency. 

828 

829 Raises: 

830 ObjectFormatException: if the object is malformed in some way 

831 """ 

832 super().check() 

833 

834 def splitlines(self) -> list[bytes]: 

835 """Return list of lines in this blob. 

836 

837 This preserves the original line endings. 

838 """ 

839 chunks = self.chunked 

840 if not chunks: 

841 return [] 

842 if len(chunks) == 1: 

843 result: list[bytes] = chunks[0].splitlines(True) 

844 return result 

845 remaining = None 

846 ret = [] 

847 for chunk in chunks: 

848 lines = chunk.splitlines(True) 

849 if len(lines) > 1: 

850 ret.append((remaining or b"") + lines[0]) 

851 ret.extend(lines[1:-1]) 

852 remaining = lines[-1] 

853 elif len(lines) == 1: 

854 if remaining is None: 

855 remaining = lines.pop() 

856 else: 

857 remaining += lines.pop() 

858 if remaining is not None: 

859 ret.append(remaining) 

860 return ret 

861 

862 

863def _parse_message( 

864 chunks: Iterable[bytes], 

865) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]: 

866 """Parse a message with a list of fields and a body. 

867 

868 Args: 

869 chunks: the raw chunks of the tag or commit object. 

870 Returns: iterator of tuples of (field, value), one per header line, in the 

871 order read from the text, possibly including duplicates. Includes a 

872 field named None for the freeform tag/commit text. 

873 """ 

874 f = BytesIO(b"".join(chunks)) 

875 k = None 

876 v = b"" 

877 eof = False 

878 

879 def _strip_last_newline(value: bytes) -> bytes: 

880 """Strip the last newline from value.""" 

881 if value and value.endswith(b"\n"): 

882 return value[:-1] 

883 return value 

884 

885 # Parse the headers 

886 # 

887 # Headers can contain newlines. The next line is indented with a space. 

888 # We store the latest key as 'k', and the accumulated value as 'v'. 

889 for line in f: 

890 if line.startswith(b" "): 

891 # Indented continuation of the previous line 

892 v += line[1:] 

893 else: 

894 if k is not None: 

895 # We parsed a new header, return its value 

896 yield (k, _strip_last_newline(v)) 

897 if line == b"\n": 

898 # Empty line indicates end of headers 

899 break 

900 (k, v) = line.split(b" ", 1) 

901 

902 else: 

903 # We reached end of file before the headers ended. We still need to 

904 # return the previous header, then we need to return a None field for 

905 # the text. 

906 eof = True 

907 if k is not None: 

908 yield (k, _strip_last_newline(v)) 

909 yield (None, None) 

910 

911 if not eof: 

912 # We didn't reach the end of file while parsing headers. We can return 

913 # the rest of the file as a message. 

914 yield (None, f.read()) 

915 

916 f.close() 

917 

918 

919def _format_message( 

920 headers: Sequence[tuple[bytes, bytes]], body: bytes | None 

921) -> Iterator[bytes]: 

922 for field, value in headers: 

923 lines = value.split(b"\n") 

924 yield git_line(field, lines[0]) 

925 for line in lines[1:]: 

926 yield b" " + line + b"\n" 

927 yield b"\n" # There must be a new line after the headers 

928 if body: 

929 yield body 

930 

931 

932class Tag(ShaFile): 

933 """A Git Tag object.""" 

934 

935 type_name = b"tag" 

936 type_num = 4 

937 

938 __slots__ = ( 

939 "_message", 

940 "_name", 

941 "_object_class", 

942 "_object_sha", 

943 "_signature", 

944 "_tag_time", 

945 "_tag_timezone", 

946 "_tag_timezone_neg_utc", 

947 "_tagger", 

948 ) 

949 

950 _message: bytes | None 

951 _name: bytes | None 

952 _object_class: "type[ShaFile] | None" 

953 _object_sha: bytes | None 

954 _signature: bytes | None 

955 _tag_time: int | None 

956 _tag_timezone: int | None 

957 _tag_timezone_neg_utc: bool | None 

958 _tagger: bytes | None 

959 

960 def __init__(self) -> None: 

961 """Initialize a new Tag object.""" 

962 super().__init__() 

963 self._tagger = None 

964 self._tag_time = None 

965 self._tag_timezone = None 

966 self._tag_timezone_neg_utc = False 

967 self._signature: bytes | None = None 

968 

969 @classmethod 

970 def from_path(cls, filename: str | bytes) -> "Tag": 

971 """Read a tag from a file on disk. 

972 

973 Args: 

974 filename: Path to the tag file 

975 

976 Returns: 

977 A Tag object 

978 

979 Raises: 

980 NotTagError: If the file is not a tag 

981 """ 

982 tag = ShaFile.from_path(filename) 

983 if not isinstance(tag, cls): 

984 raise NotTagError(_path_to_bytes(filename)) 

985 return tag 

986 

987 def check(self) -> None: 

988 """Check this object for internal consistency. 

989 

990 Raises: 

991 ObjectFormatException: if the object is malformed in some way 

992 """ 

993 super().check() 

994 assert self._chunked_text is not None 

995 self._check_has_member("_object_sha", "missing object sha") 

996 self._check_has_member("_object_class", "missing object type") 

997 self._check_has_member("_name", "missing tag name") 

998 

999 if not self._name: 

1000 raise ObjectFormatException("empty tag name") 

1001 

1002 if self._object_sha is None: 

1003 raise ObjectFormatException("missing object sha") 

1004 check_hexsha(self._object_sha, "invalid object sha") 

1005 

1006 if self._tagger is not None: 

1007 check_identity(self._tagger, "invalid tagger") 

1008 

1009 self._check_has_member("_tag_time", "missing tag time") 

1010 if self._tag_time is None: 

1011 raise ObjectFormatException("missing tag time") 

1012 check_time(self._tag_time) 

1013 

1014 last = None 

1015 for field, _ in _parse_message(self._chunked_text): 

1016 if field == _OBJECT_HEADER and last is not None: 

1017 raise ObjectFormatException("unexpected object") 

1018 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

1019 raise ObjectFormatException("unexpected type") 

1020 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

1021 raise ObjectFormatException("unexpected tag name") 

1022 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

1023 raise ObjectFormatException("unexpected tagger") 

1024 last = field 

1025 

1026 def _serialize(self) -> list[bytes]: 

1027 headers = [] 

1028 if self._object_sha is None: 

1029 raise ObjectFormatException("missing object sha") 

1030 headers.append((_OBJECT_HEADER, self._object_sha)) 

1031 if self._object_class is None: 

1032 raise ObjectFormatException("missing object class") 

1033 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

1034 if self._name is None: 

1035 raise ObjectFormatException("missing tag name") 

1036 headers.append((_TAG_HEADER, self._name)) 

1037 if self._tagger: 

1038 if self._tag_time is None: 

1039 headers.append((_TAGGER_HEADER, self._tagger)) 

1040 else: 

1041 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

1042 raise ObjectFormatException("missing timezone info") 

1043 headers.append( 

1044 ( 

1045 _TAGGER_HEADER, 

1046 format_time_entry( 

1047 self._tagger, 

1048 self._tag_time, 

1049 (self._tag_timezone, self._tag_timezone_neg_utc), 

1050 ), 

1051 ) 

1052 ) 

1053 

1054 if self.message is None and self._signature is None: 

1055 body = None 

1056 else: 

1057 body = (self.message or b"") + (self._signature or b"") 

1058 return list(_format_message(headers, body)) 

1059 

1060 def _deserialize(self, chunks: list[bytes]) -> None: 

1061 """Grab the metadata attached to the tag.""" 

1062 self._tagger = None 

1063 self._tag_time = None 

1064 self._tag_timezone = None 

1065 self._tag_timezone_neg_utc = False 

1066 for field, value in _parse_message(chunks): 

1067 if field == _OBJECT_HEADER: 

1068 self._object_sha = value 

1069 elif field == _TYPE_HEADER: 

1070 assert isinstance(value, bytes) 

1071 obj_class = object_class(value) 

1072 if not obj_class: 

1073 raise ObjectFormatException(f"Not a known type: {value!r}") 

1074 self._object_class = obj_class 

1075 elif field == _TAG_HEADER: 

1076 self._name = value 

1077 elif field == _TAGGER_HEADER: 

1078 if value is None: 

1079 raise ObjectFormatException("missing tagger value") 

1080 ( 

1081 self._tagger, 

1082 self._tag_time, 

1083 (self._tag_timezone, self._tag_timezone_neg_utc), 

1084 ) = parse_time_entry(value) 

1085 elif field is None: 

1086 if value is None: 

1087 self._message = None 

1088 self._signature = None 

1089 else: 

1090 # Try to find either PGP or SSH signature 

1091 sig_idx = None 

1092 try: 

1093 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

1094 except ValueError: 

1095 try: 

1096 sig_idx = value.index(BEGIN_SSH_SIGNATURE) 

1097 except ValueError: 

1098 pass 

1099 

1100 if sig_idx is not None: 

1101 self._message = value[:sig_idx] 

1102 self._signature = value[sig_idx:] 

1103 else: 

1104 self._message = value 

1105 self._signature = None 

1106 else: 

1107 raise ObjectFormatException( 

1108 f"Unknown field {field.decode('ascii', 'replace')}" 

1109 ) 

1110 

1111 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

1112 """Get the object pointed to by this tag. 

1113 

1114 Returns: tuple of (object class, sha). 

1115 """ 

1116 if self._object_class is None or self._object_sha is None: 

1117 raise ValueError("Tag object is not properly initialized") 

1118 return (self._object_class, self._object_sha) 

1119 

1120 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

1121 (self._object_class, self._object_sha) = value 

1122 self._needs_serialization = True 

1123 

1124 object = property(_get_object, _set_object) 

1125 

1126 name = serializable_property("name", "The name of this tag") 

1127 tagger = serializable_property( 

1128 "tagger", "Returns the name of the person who created this tag" 

1129 ) 

1130 tag_time = serializable_property( 

1131 "tag_time", 

1132 "The creation timestamp of the tag. As the number of seconds since the epoch", 

1133 ) 

1134 tag_timezone = serializable_property( 

1135 "tag_timezone", "The timezone that tag_time is in." 

1136 ) 

1137 message = serializable_property("message", "the message attached to this tag") 

1138 

1139 signature = serializable_property("signature", "Optional detached GPG signature") 

1140 

1141 def sign(self, keyid: str | None = None) -> None: 

1142 """Sign this tag with a GPG key. 

1143 

1144 Args: 

1145 keyid: Optional GPG key ID to use for signing. If not specified, 

1146 the default GPG key will be used. 

1147 """ 

1148 import gpg 

1149 

1150 with gpg.Context(armor=True) as c: 

1151 if keyid is not None: 

1152 key = c.get_key(keyid) 

1153 with gpg.Context(armor=True, signers=[key]) as ctx: 

1154 self.signature, _unused_result = ctx.sign( 

1155 self.as_raw_string(), 

1156 mode=gpg.constants.sig.mode.DETACH, 

1157 ) 

1158 else: 

1159 self.signature, _unused_result = c.sign( 

1160 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1161 ) 

1162 

1163 def raw_without_sig(self) -> bytes: 

1164 """Return raw string serialization without the GPG/SSH signature. 

1165 

1166 self.signature is a signature for the returned raw byte string serialization. 

1167 """ 

1168 ret = self.as_raw_string() 

1169 if self._signature: 

1170 ret = ret[: -len(self._signature)] 

1171 return ret 

1172 

1173 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

1174 """Extract the payload, signature, and signature type from this tag. 

1175 

1176 Returns: 

1177 tuple of (``payload``, ``signature``, ``signature_type``) where: 

1178 

1179 - ``payload``: The raw tag data without the signature 

1180 - ``signature``: The signature bytes if present, None otherwise 

1181 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1182 

1183 Raises: 

1184 ObjectFormatException: If signature has unknown format 

1185 """ 

1186 if self._signature is None: 

1187 return self.as_raw_string(), None, None 

1188 

1189 payload = self.raw_without_sig() 

1190 

1191 # Determine signature type 

1192 if self._signature.startswith(BEGIN_PGP_SIGNATURE): 

1193 sig_type = SIGNATURE_PGP 

1194 elif self._signature.startswith(BEGIN_SSH_SIGNATURE): 

1195 sig_type = SIGNATURE_SSH 

1196 else: 

1197 raise ObjectFormatException("Unknown signature format") 

1198 

1199 return payload, self._signature, sig_type 

1200 

1201 def verify(self, keyids: Iterable[str] | None = None) -> None: 

1202 """Verify GPG signature for this tag (if it is signed). 

1203 

1204 Args: 

1205 keyids: Optional iterable of trusted keyids for this tag. 

1206 If this tag is not signed by any key in keyids verification will 

1207 fail. If not specified, this function only verifies that the tag 

1208 has a valid signature. 

1209 

1210 Raises: 

1211 gpg.errors.BadSignatures: if GPG signature verification fails 

1212 gpg.errors.MissingSignatures: if tag was not signed by a key 

1213 specified in keyids 

1214 """ 

1215 if self._signature is None: 

1216 return 

1217 

1218 import gpg 

1219 

1220 with gpg.Context() as ctx: 

1221 data, result = ctx.verify( 

1222 self.raw_without_sig(), 

1223 signature=self._signature, 

1224 ) 

1225 if keyids: 

1226 keys = [ctx.get_key(key) for key in keyids] 

1227 for key in keys: 

1228 for subkey in key.subkeys: 

1229 for sig in result.signatures: 

1230 if subkey.can_sign and subkey.fpr == sig.fpr: 

1231 return 

1232 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1233 

1234 

1235class TreeEntry(NamedTuple): 

1236 """Named tuple encapsulating a single tree entry.""" 

1237 

1238 path: bytes 

1239 mode: int 

1240 sha: ObjectID 

1241 

1242 def in_path(self, path: bytes) -> "TreeEntry": 

1243 """Return a copy of this entry with the given path prepended.""" 

1244 if not isinstance(self.path, bytes): 

1245 raise TypeError(f"Expected bytes for path, got {path!r}") 

1246 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1247 

1248 

1249def parse_tree( 

1250 text: bytes, strict: bool = False 

1251) -> Iterator[tuple[bytes, int, ObjectID]]: 

1252 """Parse a tree text. 

1253 

1254 Args: 

1255 text: Serialized text to parse 

1256 strict: If True, enforce strict validation 

1257 Returns: iterator of tuples of (name, mode, sha) 

1258 

1259 Raises: 

1260 ObjectFormatException: if the object was malformed in some way 

1261 """ 

1262 count = 0 

1263 length = len(text) 

1264 while count < length: 

1265 mode_end = text.index(b" ", count) 

1266 mode_text = text[count:mode_end] 

1267 if strict and mode_text.startswith(b"0"): 

1268 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1269 try: 

1270 mode = int(mode_text, 8) 

1271 except ValueError as exc: 

1272 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1273 name_end = text.index(b"\0", mode_end) 

1274 name = text[mode_end + 1 : name_end] 

1275 count = name_end + 21 

1276 sha = text[name_end + 1 : count] 

1277 if len(sha) != 20: 

1278 raise ObjectFormatException("Sha has invalid length") 

1279 hexsha = sha_to_hex(RawObjectID(sha)) 

1280 yield (name, mode, hexsha) 

1281 

1282 

1283def serialize_tree(items: Iterable[tuple[bytes, int, ObjectID]]) -> Iterator[bytes]: 

1284 """Serialize the items in a tree to a text. 

1285 

1286 Args: 

1287 items: Sorted iterable over (name, mode, sha) tuples 

1288 Returns: Serialized tree text as chunks 

1289 """ 

1290 for name, mode, hexsha in items: 

1291 yield ( 

1292 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1293 ) 

1294 

1295 

1296def sorted_tree_items( 

1297 entries: dict[bytes, tuple[int, ObjectID]], name_order: bool 

1298) -> Iterator[TreeEntry]: 

1299 """Iterate over a tree entries dictionary. 

1300 

1301 Args: 

1302 name_order: If True, iterate entries in order of their name. If 

1303 False, iterate entries in tree order, that is, treat subtree entries as 

1304 having '/' appended. 

1305 entries: Dictionary mapping names to (mode, sha) tuples 

1306 Returns: Iterator over (name, mode, hexsha) 

1307 """ 

1308 if name_order: 

1309 key_func = key_entry_name_order 

1310 else: 

1311 key_func = key_entry 

1312 for name, entry in sorted(entries.items(), key=key_func): 

1313 mode, hexsha = entry 

1314 # Stricter type checks than normal to mirror checks in the Rust version. 

1315 mode = int(mode) 

1316 if not isinstance(hexsha, bytes): 

1317 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1318 yield TreeEntry(name, mode, hexsha) 

1319 

1320 

1321def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1322 """Sort key for tree entry. 

1323 

1324 Args: 

1325 entry: (name, value) tuple 

1326 """ 

1327 (name, (mode, _sha)) = entry 

1328 if stat.S_ISDIR(mode): 

1329 name += b"/" 

1330 return name 

1331 

1332 

1333def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1334 """Sort key for tree entry in name order.""" 

1335 return entry[0] 

1336 

1337 

1338def pretty_format_tree_entry( 

1339 name: bytes, mode: int, hexsha: ObjectID, encoding: str = "utf-8" 

1340) -> str: 

1341 """Pretty format tree entry. 

1342 

1343 Args: 

1344 name: Name of the directory entry 

1345 mode: Mode of entry 

1346 hexsha: Hexsha of the referenced object 

1347 encoding: Character encoding for the name 

1348 Returns: string describing the tree entry 

1349 """ 

1350 if mode & stat.S_IFDIR: 

1351 kind = "tree" 

1352 else: 

1353 kind = "blob" 

1354 return "{:04o} {} {}\t{}\n".format( 

1355 mode, 

1356 kind, 

1357 hexsha.decode("ascii"), 

1358 name.decode(encoding, "replace"), 

1359 ) 

1360 

1361 

1362class SubmoduleEncountered(Exception): 

1363 """A submodule was encountered while resolving a path.""" 

1364 

1365 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1366 """Initialize SubmoduleEncountered exception. 

1367 

1368 Args: 

1369 path: Path where the submodule was encountered 

1370 sha: SHA of the submodule 

1371 """ 

1372 self.path = path 

1373 self.sha = sha 

1374 

1375 

1376class Tree(ShaFile): 

1377 """A Git tree object.""" 

1378 

1379 type_name = b"tree" 

1380 type_num = 2 

1381 

1382 __slots__ = "_entries" 

1383 

1384 def __init__(self) -> None: 

1385 """Initialize an empty Tree.""" 

1386 super().__init__() 

1387 self._entries: dict[bytes, tuple[int, ObjectID]] = {} 

1388 

1389 @classmethod 

1390 def from_path(cls, filename: str | bytes) -> "Tree": 

1391 """Read a tree from a file on disk. 

1392 

1393 Args: 

1394 filename: Path to the tree file 

1395 

1396 Returns: 

1397 A Tree object 

1398 

1399 Raises: 

1400 NotTreeError: If the file is not a tree 

1401 """ 

1402 tree = ShaFile.from_path(filename) 

1403 if not isinstance(tree, cls): 

1404 raise NotTreeError(_path_to_bytes(filename)) 

1405 return tree 

1406 

1407 def __contains__(self, name: bytes) -> bool: 

1408 """Check if name exists in tree.""" 

1409 return name in self._entries 

1410 

1411 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1412 """Get tree entry by name.""" 

1413 return self._entries[name] 

1414 

1415 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1416 """Set a tree entry by name. 

1417 

1418 Args: 

1419 name: The name of the entry, as a string. 

1420 value: A tuple of (mode, hexsha), where mode is the mode of the 

1421 entry as an integral type and hexsha is the hex SHA of the entry as 

1422 a string. 

1423 """ 

1424 mode, hexsha = value 

1425 self._entries[name] = (mode, hexsha) 

1426 self._needs_serialization = True 

1427 

1428 def __delitem__(self, name: bytes) -> None: 

1429 """Delete tree entry by name.""" 

1430 del self._entries[name] 

1431 self._needs_serialization = True 

1432 

1433 def __len__(self) -> int: 

1434 """Return number of entries in tree.""" 

1435 return len(self._entries) 

1436 

1437 def __iter__(self) -> Iterator[bytes]: 

1438 """Iterate over tree entry names.""" 

1439 return iter(self._entries) 

1440 

1441 def add(self, name: bytes, mode: int, hexsha: ObjectID) -> None: 

1442 """Add an entry to the tree. 

1443 

1444 Args: 

1445 mode: The mode of the entry as an integral type. Not all 

1446 possible modes are supported by git; see check() for details. 

1447 name: The name of the entry, as a string. 

1448 hexsha: The hex SHA of the entry as a string. 

1449 """ 

1450 self._entries[name] = mode, hexsha 

1451 self._needs_serialization = True 

1452 

1453 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1454 """Iterate over entries. 

1455 

1456 Args: 

1457 name_order: If True, iterate in name order instead of tree 

1458 order. 

1459 Returns: Iterator over (name, mode, sha) tuples 

1460 """ 

1461 return sorted_tree_items(self._entries, name_order) 

1462 

1463 def items(self) -> list[TreeEntry]: 

1464 """Return the sorted entries in this tree. 

1465 

1466 Returns: List with (name, mode, sha) tuples 

1467 """ 

1468 return list(self.iteritems()) 

1469 

1470 def _deserialize(self, chunks: list[bytes]) -> None: 

1471 """Grab the entries in the tree.""" 

1472 try: 

1473 parsed_entries = parse_tree(b"".join(chunks)) 

1474 except ValueError as exc: 

1475 raise ObjectFormatException(exc) from exc 

1476 # TODO: list comprehension is for efficiency in the common (small) 

1477 # case; if memory efficiency in the large case is a concern, use a 

1478 # genexp. 

1479 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1480 

1481 def check(self) -> None: 

1482 """Check this object for internal consistency. 

1483 

1484 Raises: 

1485 ObjectFormatException: if the object is malformed in some way 

1486 """ 

1487 super().check() 

1488 assert self._chunked_text is not None 

1489 last = None 

1490 allowed_modes = ( 

1491 stat.S_IFREG | 0o755, 

1492 stat.S_IFREG | 0o644, 

1493 stat.S_IFLNK, 

1494 stat.S_IFDIR, 

1495 S_IFGITLINK, 

1496 # TODO: optionally exclude as in git fsck --strict 

1497 stat.S_IFREG | 0o664, 

1498 ) 

1499 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1500 check_hexsha(sha, f"invalid sha {sha!r}") 

1501 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1502 raise ObjectFormatException( 

1503 "invalid name {}".format(name.decode("utf-8", "replace")) 

1504 ) 

1505 

1506 if mode not in allowed_modes: 

1507 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1508 

1509 entry = (name, (mode, sha)) 

1510 if last: 

1511 if key_entry(last) > key_entry(entry): 

1512 raise ObjectFormatException("entries not sorted") 

1513 if name == last[0]: 

1514 raise ObjectFormatException(f"duplicate entry {name!r}") 

1515 last = entry 

1516 

1517 def _serialize(self) -> list[bytes]: 

1518 return list(serialize_tree(self.iteritems())) 

1519 

1520 def as_pretty_string(self) -> str: 

1521 """Return a human-readable string representation of this tree. 

1522 

1523 Returns: 

1524 Pretty-printed tree entries 

1525 """ 

1526 text: list[str] = [] 

1527 for entry in self.iteritems(): 

1528 if ( 

1529 entry.path is not None 

1530 and entry.mode is not None 

1531 and entry.sha is not None 

1532 ): 

1533 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha)) 

1534 return "".join(text) 

1535 

1536 def lookup_path( 

1537 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1538 ) -> tuple[int, ObjectID]: 

1539 """Look up an object in a Git tree. 

1540 

1541 Args: 

1542 lookup_obj: Callback for retrieving object by SHA1 

1543 path: Path to lookup 

1544 Returns: A tuple of (mode, SHA) of the resulting path. 

1545 """ 

1546 # Handle empty path - return the tree itself 

1547 if not path: 

1548 return stat.S_IFDIR, self.id 

1549 

1550 parts = path.split(b"/") 

1551 sha = self.id 

1552 mode: int | None = None 

1553 for i, p in enumerate(parts): 

1554 if not p: 

1555 continue 

1556 if mode is not None and S_ISGITLINK(mode): 

1557 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1558 obj = lookup_obj(sha) 

1559 if not isinstance(obj, Tree): 

1560 raise NotTreeError(sha) 

1561 mode, sha = obj[p] 

1562 if mode is None: 

1563 raise ValueError("No valid path found") 

1564 return mode, sha 

1565 

1566 

1567def parse_timezone(text: bytes) -> tuple[int, bool]: 

1568 """Parse a timezone text fragment (e.g. '+0100'). 

1569 

1570 Args: 

1571 text: Text to parse. 

1572 Returns: Tuple with timezone as seconds difference to UTC 

1573 and a boolean indicating whether this was a UTC timezone 

1574 prefixed with a negative sign (-0000). 

1575 """ 

1576 # cgit parses the first character as the sign, and the rest 

1577 # as an integer (using strtol), which could also be negative. 

1578 # We do the same for compatibility. See #697828. 

1579 if text[0] not in b"+-": 

1580 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1581 sign = text[:1] 

1582 offset = int(text[1:]) 

1583 if sign == b"-": 

1584 offset = -offset 

1585 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1586 signum = ((offset < 0) and -1) or 1 

1587 offset = abs(offset) 

1588 hours = int(offset / 100) 

1589 minutes = offset % 100 

1590 return ( 

1591 signum * (hours * 3600 + minutes * 60), 

1592 unnecessary_negative_timezone, 

1593 ) 

1594 

1595 

1596def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1597 """Format a timezone for Git serialization. 

1598 

1599 Args: 

1600 offset: Timezone offset as seconds difference to UTC 

1601 unnecessary_negative_timezone: Whether to use a minus sign for 

1602 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1603 """ 

1604 if offset % 60 != 0: 

1605 raise ValueError("Unable to handle non-minute offset.") 

1606 if offset < 0 or unnecessary_negative_timezone: 

1607 sign = "-" 

1608 offset = -offset 

1609 else: 

1610 sign = "+" 

1611 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1612 

1613 

1614def parse_time_entry( 

1615 value: bytes, 

1616) -> tuple[bytes, int | None, tuple[int | None, bool]]: 

1617 """Parse event. 

1618 

1619 Args: 

1620 value: Bytes representing a git commit/tag line 

1621 Raises: 

1622 ObjectFormatException in case of parsing error (malformed 

1623 field date) 

1624 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1625 """ 

1626 try: 

1627 sep = value.rindex(b"> ") 

1628 except ValueError: 

1629 return (value, None, (None, False)) 

1630 try: 

1631 person = value[0 : sep + 1] 

1632 rest = value[sep + 2 :] 

1633 timetext, timezonetext = rest.rsplit(b" ", 1) 

1634 time = int(timetext) 

1635 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1636 except ValueError as exc: 

1637 raise ObjectFormatException(exc) from exc 

1638 return person, time, (timezone, timezone_neg_utc) 

1639 

1640 

1641def format_time_entry( 

1642 person: bytes, time: int, timezone_info: tuple[int, bool] 

1643) -> bytes: 

1644 """Format an event.""" 

1645 (timezone, timezone_neg_utc) = timezone_info 

1646 return b" ".join( 

1647 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1648 ) 

1649 

1650 

1651@replace_me(since="0.21.0", remove_in="0.24.0") 

1652def parse_commit( 

1653 chunks: Iterable[bytes], 

1654) -> tuple[ 

1655 bytes | None, 

1656 list[bytes], 

1657 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1658 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1659 bytes | None, 

1660 list[Tag], 

1661 bytes | None, 

1662 bytes | None, 

1663 list[tuple[bytes, bytes]], 

1664]: 

1665 """Parse a commit object from chunks. 

1666 

1667 Args: 

1668 chunks: Chunks to parse 

1669 Returns: Tuple of (tree, parents, author_info, commit_info, 

1670 encoding, mergetag, gpgsig, message, extra) 

1671 """ 

1672 parents = [] 

1673 extra = [] 

1674 tree = None 

1675 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1676 None, 

1677 None, 

1678 (None, None), 

1679 ) 

1680 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1681 None, 

1682 None, 

1683 (None, None), 

1684 ) 

1685 encoding = None 

1686 mergetag = [] 

1687 message = None 

1688 gpgsig = None 

1689 

1690 for field, value in _parse_message(chunks): 

1691 # TODO(jelmer): Enforce ordering 

1692 if field == _TREE_HEADER: 

1693 tree = value 

1694 elif field == _PARENT_HEADER: 

1695 if value is None: 

1696 raise ObjectFormatException("missing parent value") 

1697 parents.append(value) 

1698 elif field == _AUTHOR_HEADER: 

1699 if value is None: 

1700 raise ObjectFormatException("missing author value") 

1701 author_info = parse_time_entry(value) 

1702 elif field == _COMMITTER_HEADER: 

1703 if value is None: 

1704 raise ObjectFormatException("missing committer value") 

1705 commit_info = parse_time_entry(value) 

1706 elif field == _ENCODING_HEADER: 

1707 encoding = value 

1708 elif field == _MERGETAG_HEADER: 

1709 if value is None: 

1710 raise ObjectFormatException("missing mergetag value") 

1711 tag = Tag.from_string(value + b"\n") 

1712 assert isinstance(tag, Tag) 

1713 mergetag.append(tag) 

1714 elif field == _GPGSIG_HEADER: 

1715 gpgsig = value 

1716 elif field is None: 

1717 message = value 

1718 else: 

1719 if value is None: 

1720 raise ObjectFormatException(f"missing value for field {field!r}") 

1721 extra.append((field, value)) 

1722 return ( 

1723 tree, 

1724 parents, 

1725 author_info, 

1726 commit_info, 

1727 encoding, 

1728 mergetag, 

1729 gpgsig, 

1730 message, 

1731 extra, 

1732 ) 

1733 

1734 

1735class Commit(ShaFile): 

1736 """A git commit object.""" 

1737 

1738 type_name = b"commit" 

1739 type_num = 1 

1740 

1741 __slots__ = ( 

1742 "_author", 

1743 "_author_time", 

1744 "_author_timezone", 

1745 "_author_timezone_neg_utc", 

1746 "_commit_time", 

1747 "_commit_timezone", 

1748 "_commit_timezone_neg_utc", 

1749 "_committer", 

1750 "_encoding", 

1751 "_extra", 

1752 "_gpgsig", 

1753 "_mergetag", 

1754 "_message", 

1755 "_parents", 

1756 "_tree", 

1757 ) 

1758 

1759 def __init__(self) -> None: 

1760 """Initialize an empty Commit.""" 

1761 super().__init__() 

1762 self._parents: list[ObjectID] = [] 

1763 self._encoding: bytes | None = None 

1764 self._mergetag: list[Tag] = [] 

1765 self._gpgsig: bytes | None = None 

1766 self._extra: list[tuple[bytes, bytes | None]] = [] 

1767 self._author_timezone_neg_utc: bool | None = False 

1768 self._commit_timezone_neg_utc: bool | None = False 

1769 

1770 @classmethod 

1771 def from_path(cls, path: str | bytes) -> "Commit": 

1772 """Read a commit from a file on disk. 

1773 

1774 Args: 

1775 path: Path to the commit file 

1776 

1777 Returns: 

1778 A Commit object 

1779 

1780 Raises: 

1781 NotCommitError: If the file is not a commit 

1782 """ 

1783 commit = ShaFile.from_path(path) 

1784 if not isinstance(commit, cls): 

1785 raise NotCommitError(_path_to_bytes(path)) 

1786 return commit 

1787 

1788 def _deserialize(self, chunks: list[bytes]) -> None: 

1789 self._parents = [] 

1790 self._extra = [] 

1791 self._tree = None 

1792 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1793 None, 

1794 None, 

1795 (None, None), 

1796 ) 

1797 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1798 None, 

1799 None, 

1800 (None, None), 

1801 ) 

1802 self._encoding = None 

1803 self._mergetag = [] 

1804 self._message = None 

1805 self._gpgsig = None 

1806 

1807 for field, value in _parse_message(chunks): 

1808 # TODO(jelmer): Enforce ordering 

1809 if field == _TREE_HEADER: 

1810 self._tree = value 

1811 elif field == _PARENT_HEADER: 

1812 assert value is not None 

1813 self._parents.append(ObjectID(value)) 

1814 elif field == _AUTHOR_HEADER: 

1815 if value is None: 

1816 raise ObjectFormatException("missing author value") 

1817 author_info = parse_time_entry(value) 

1818 elif field == _COMMITTER_HEADER: 

1819 if value is None: 

1820 raise ObjectFormatException("missing committer value") 

1821 commit_info = parse_time_entry(value) 

1822 elif field == _ENCODING_HEADER: 

1823 self._encoding = value 

1824 elif field == _MERGETAG_HEADER: 

1825 assert value is not None 

1826 tag = Tag.from_string(value + b"\n") 

1827 assert isinstance(tag, Tag) 

1828 self._mergetag.append(tag) 

1829 elif field == _GPGSIG_HEADER: 

1830 self._gpgsig = value 

1831 elif field is None: 

1832 self._message = value 

1833 else: 

1834 self._extra.append((field, value)) 

1835 

1836 ( 

1837 self._author, 

1838 self._author_time, 

1839 (self._author_timezone, self._author_timezone_neg_utc), 

1840 ) = author_info 

1841 ( 

1842 self._committer, 

1843 self._commit_time, 

1844 (self._commit_timezone, self._commit_timezone_neg_utc), 

1845 ) = commit_info 

1846 

1847 def check(self) -> None: 

1848 """Check this object for internal consistency. 

1849 

1850 Raises: 

1851 ObjectFormatException: if the object is malformed in some way 

1852 """ 

1853 super().check() 

1854 assert self._chunked_text is not None 

1855 self._check_has_member("_tree", "missing tree") 

1856 self._check_has_member("_author", "missing author") 

1857 self._check_has_member("_committer", "missing committer") 

1858 self._check_has_member("_author_time", "missing author time") 

1859 self._check_has_member("_commit_time", "missing commit time") 

1860 

1861 for parent in self._parents: 

1862 check_hexsha(parent, "invalid parent sha") 

1863 assert self._tree is not None # checked by _check_has_member above 

1864 check_hexsha(self._tree, "invalid tree sha") 

1865 

1866 assert self._author is not None # checked by _check_has_member above 

1867 assert self._committer is not None # checked by _check_has_member above 

1868 check_identity(self._author, "invalid author") 

1869 check_identity(self._committer, "invalid committer") 

1870 

1871 assert self._author_time is not None # checked by _check_has_member above 

1872 assert self._commit_time is not None # checked by _check_has_member above 

1873 check_time(self._author_time) 

1874 check_time(self._commit_time) 

1875 

1876 last = None 

1877 for field, _ in _parse_message(self._chunked_text): 

1878 if field == _TREE_HEADER and last is not None: 

1879 raise ObjectFormatException("unexpected tree") 

1880 elif field == _PARENT_HEADER and last not in ( 

1881 _PARENT_HEADER, 

1882 _TREE_HEADER, 

1883 ): 

1884 raise ObjectFormatException("unexpected parent") 

1885 elif field == _AUTHOR_HEADER and last not in ( 

1886 _TREE_HEADER, 

1887 _PARENT_HEADER, 

1888 ): 

1889 raise ObjectFormatException("unexpected author") 

1890 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1891 raise ObjectFormatException("unexpected committer") 

1892 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1893 raise ObjectFormatException("unexpected encoding") 

1894 last = field 

1895 

1896 # TODO: optionally check for duplicate parents 

1897 

1898 def sign(self, keyid: str | None = None) -> None: 

1899 """Sign this commit with a GPG key. 

1900 

1901 Args: 

1902 keyid: Optional GPG key ID to use for signing. If not specified, 

1903 the default GPG key will be used. 

1904 """ 

1905 import gpg 

1906 

1907 with gpg.Context(armor=True) as c: 

1908 if keyid is not None: 

1909 key = c.get_key(keyid) 

1910 with gpg.Context(armor=True, signers=[key]) as ctx: 

1911 self.gpgsig, _unused_result = ctx.sign( 

1912 self.as_raw_string(), 

1913 mode=gpg.constants.sig.mode.DETACH, 

1914 ) 

1915 else: 

1916 self.gpgsig, _unused_result = c.sign( 

1917 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1918 ) 

1919 

1920 def raw_without_sig(self) -> bytes: 

1921 """Return raw string serialization without the GPG/SSH signature. 

1922 

1923 self.gpgsig is a signature for the returned raw byte string serialization. 

1924 """ 

1925 tmp = self.copy() 

1926 assert isinstance(tmp, Commit) 

1927 tmp._gpgsig = None 

1928 tmp.gpgsig = None 

1929 return tmp.as_raw_string() 

1930 

1931 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

1932 """Extract the payload, signature, and signature type from this commit. 

1933 

1934 Returns: 

1935 tuple of (``payload``, ``signature``, ``signature_type``) where: 

1936 

1937 - ``payload``: The raw commit data without the signature 

1938 - ``signature``: The signature bytes if present, None otherwise 

1939 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1940 

1941 Raises: 

1942 ObjectFormatException: If signature has unknown format 

1943 """ 

1944 if self._gpgsig is None: 

1945 return self.as_raw_string(), None, None 

1946 

1947 payload = self.raw_without_sig() 

1948 

1949 # Determine signature type 

1950 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE): 

1951 sig_type = SIGNATURE_PGP 

1952 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE): 

1953 sig_type = SIGNATURE_SSH 

1954 else: 

1955 raise ObjectFormatException("Unknown signature format") 

1956 

1957 return payload, self._gpgsig, sig_type 

1958 

1959 def verify(self, keyids: Iterable[str] | None = None) -> None: 

1960 """Verify GPG signature for this commit (if it is signed). 

1961 

1962 Args: 

1963 keyids: Optional iterable of trusted keyids for this commit. 

1964 If this commit is not signed by any key in keyids verification will 

1965 fail. If not specified, this function only verifies that the commit 

1966 has a valid signature. 

1967 

1968 Raises: 

1969 gpg.errors.BadSignatures: if GPG signature verification fails 

1970 gpg.errors.MissingSignatures: if commit was not signed by a key 

1971 specified in keyids 

1972 """ 

1973 if self._gpgsig is None: 

1974 return 

1975 

1976 import gpg 

1977 

1978 with gpg.Context() as ctx: 

1979 data, result = ctx.verify( 

1980 self.raw_without_sig(), 

1981 signature=self._gpgsig, 

1982 ) 

1983 if keyids: 

1984 keys = [ctx.get_key(key) for key in keyids] 

1985 for key in keys: 

1986 for subkey in key.subkeys: 

1987 for sig in result.signatures: 

1988 if subkey.can_sign and subkey.fpr == sig.fpr: 

1989 return 

1990 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1991 

1992 def _serialize(self) -> list[bytes]: 

1993 headers = [] 

1994 assert self._tree is not None 

1995 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1996 headers.append((_TREE_HEADER, tree_bytes)) 

1997 for p in self._parents: 

1998 headers.append((_PARENT_HEADER, p)) 

1999 assert self._author is not None 

2000 assert self._author_time is not None 

2001 assert self._author_timezone is not None 

2002 assert self._author_timezone_neg_utc is not None 

2003 headers.append( 

2004 ( 

2005 _AUTHOR_HEADER, 

2006 format_time_entry( 

2007 self._author, 

2008 self._author_time, 

2009 (self._author_timezone, self._author_timezone_neg_utc), 

2010 ), 

2011 ) 

2012 ) 

2013 assert self._committer is not None 

2014 assert self._commit_time is not None 

2015 assert self._commit_timezone is not None 

2016 assert self._commit_timezone_neg_utc is not None 

2017 headers.append( 

2018 ( 

2019 _COMMITTER_HEADER, 

2020 format_time_entry( 

2021 self._committer, 

2022 self._commit_time, 

2023 (self._commit_timezone, self._commit_timezone_neg_utc), 

2024 ), 

2025 ) 

2026 ) 

2027 if self.encoding: 

2028 headers.append((_ENCODING_HEADER, self.encoding)) 

2029 for mergetag in self.mergetag: 

2030 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

2031 headers.extend( 

2032 (field, value) for field, value in self._extra if value is not None 

2033 ) 

2034 if self.gpgsig: 

2035 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

2036 return list(_format_message(headers, self._message)) 

2037 

2038 tree = serializable_property("tree", "Tree that is the state of this commit") 

2039 

2040 def _get_parents(self) -> list[ObjectID]: 

2041 """Return a list of parents of this commit.""" 

2042 return self._parents 

2043 

2044 def _set_parents(self, value: list[ObjectID]) -> None: 

2045 """Set a list of parents of this commit.""" 

2046 self._needs_serialization = True 

2047 self._parents = value 

2048 

2049 parents = property( 

2050 _get_parents, 

2051 _set_parents, 

2052 doc="Parents of this commit, by their SHA1.", 

2053 ) 

2054 

2055 @replace_me(since="0.21.0", remove_in="0.24.0") 

2056 def _get_extra(self) -> list[tuple[bytes, bytes | None]]: 

2057 """Return extra settings of this commit.""" 

2058 return self._extra 

2059 

2060 extra = property( 

2061 _get_extra, 

2062 doc="Extra header fields not understood (presumably added in a " 

2063 "newer version of git). Kept verbatim so the object can " 

2064 "be correctly reserialized. For private commit metadata, use " 

2065 "pseudo-headers in Commit.message, rather than this field.", 

2066 ) 

2067 

2068 author = serializable_property("author", "The name of the author of the commit") 

2069 

2070 committer = serializable_property( 

2071 "committer", "The name of the committer of the commit" 

2072 ) 

2073 

2074 message = serializable_property("message", "The commit message") 

2075 

2076 commit_time = serializable_property( 

2077 "commit_time", 

2078 "The timestamp of the commit. As the number of seconds since the epoch.", 

2079 ) 

2080 

2081 commit_timezone = serializable_property( 

2082 "commit_timezone", "The zone the commit time is in" 

2083 ) 

2084 

2085 author_time = serializable_property( 

2086 "author_time", 

2087 "The timestamp the commit was written. As the number of " 

2088 "seconds since the epoch.", 

2089 ) 

2090 

2091 author_timezone = serializable_property( 

2092 "author_timezone", "Returns the zone the author time is in." 

2093 ) 

2094 

2095 encoding = serializable_property("encoding", "Encoding of the commit message.") 

2096 

2097 mergetag = serializable_property("mergetag", "Associated signed tag.") 

2098 

2099 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

2100 

2101 

2102OBJECT_CLASSES = ( 

2103 Commit, 

2104 Tree, 

2105 Blob, 

2106 Tag, 

2107) 

2108 

2109_TYPE_MAP: dict[bytes | int, type[ShaFile]] = {} 

2110 

2111for cls in OBJECT_CLASSES: 

2112 _TYPE_MAP[cls.type_name] = cls 

2113 _TYPE_MAP[cls.type_num] = cls 

2114 

2115 

2116# Hold on to the pure-python implementations for testing 

2117_parse_tree_py = parse_tree 

2118_sorted_tree_items_py = sorted_tree_items 

2119try: 

2120 # Try to import Rust versions 

2121 from dulwich._objects import ( 

2122 parse_tree as _parse_tree_rs, 

2123 ) 

2124 from dulwich._objects import ( 

2125 sorted_tree_items as _sorted_tree_items_rs, 

2126 ) 

2127except ImportError: 

2128 pass 

2129else: 

2130 parse_tree = _parse_tree_rs 

2131 sorted_tree_items = _sorted_tree_items_rs