Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1020 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25import binascii 

26import os 

27import posixpath 

28import stat 

29import sys 

30import zlib 

31from collections.abc import Callable, Iterable, Iterator, Sequence 

32from hashlib import sha1 

33from io import BufferedIOBase, BytesIO 

34from typing import ( 

35 IO, 

36 TYPE_CHECKING, 

37 NamedTuple, 

38 Optional, 

39 TypeVar, 

40 Union, 

41) 

42 

43if sys.version_info >= (3, 11): 

44 from typing import Self 

45else: 

46 from typing_extensions import Self 

47 

48if sys.version_info >= (3, 10): 

49 from typing import TypeGuard 

50else: 

51 from typing_extensions import TypeGuard 

52 

53from . import replace_me 

54from .errors import ( 

55 ChecksumMismatch, 

56 FileFormatException, 

57 NotBlobError, 

58 NotCommitError, 

59 NotTagError, 

60 NotTreeError, 

61 ObjectFormatException, 

62) 

63from .file import GitFile 

64 

65if TYPE_CHECKING: 

66 from _hashlib import HASH 

67 

68 from .file import _GitFile 

69 

70ZERO_SHA = b"0" * 40 

71 

72# Header fields for commits 

73_TREE_HEADER = b"tree" 

74_PARENT_HEADER = b"parent" 

75_AUTHOR_HEADER = b"author" 

76_COMMITTER_HEADER = b"committer" 

77_ENCODING_HEADER = b"encoding" 

78_MERGETAG_HEADER = b"mergetag" 

79_GPGSIG_HEADER = b"gpgsig" 

80 

81# Header fields for objects 

82_OBJECT_HEADER = b"object" 

83_TYPE_HEADER = b"type" 

84_TAG_HEADER = b"tag" 

85_TAGGER_HEADER = b"tagger" 

86 

87 

88S_IFGITLINK = 0o160000 

89 

90 

91MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

92 

93BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

94BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----" 

95 

96# Signature type constants 

97SIGNATURE_PGP = b"pgp" 

98SIGNATURE_SSH = b"ssh" 

99 

100 

101ObjectID = bytes 

102 

103 

104class EmptyFileException(FileFormatException): 

105 """An unexpectedly empty file was encountered.""" 

106 

107 

108def S_ISGITLINK(m: int) -> bool: 

109 """Check if a mode indicates a submodule. 

110 

111 Args: 

112 m: Mode to check 

113 Returns: a ``boolean`` 

114 """ 

115 return stat.S_IFMT(m) == S_IFGITLINK 

116 

117 

118def _decompress(string: bytes) -> bytes: 

119 dcomp = zlib.decompressobj() 

120 dcomped = dcomp.decompress(string) 

121 dcomped += dcomp.flush() 

122 return dcomped 

123 

124 

125def sha_to_hex(sha: ObjectID) -> bytes: 

126 """Takes a string and returns the hex of the sha within.""" 

127 hexsha = binascii.hexlify(sha) 

128 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

129 return hexsha 

130 

131 

132def hex_to_sha(hex: Union[bytes, str]) -> bytes: 

133 """Takes a hex sha and returns a binary sha.""" 

134 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}" 

135 try: 

136 return binascii.unhexlify(hex) 

137 except TypeError as exc: 

138 if not isinstance(hex, bytes): 

139 raise 

140 raise ValueError(exc.args[0]) from exc 

141 

142 

143def valid_hexsha(hex: Union[bytes, str]) -> bool: 

144 """Check if a string is a valid hex SHA. 

145 

146 Args: 

147 hex: Hex string to check 

148 

149 Returns: 

150 True if valid hex SHA, False otherwise 

151 """ 

152 if len(hex) != 40: 

153 return False 

154 try: 

155 binascii.unhexlify(hex) 

156 except (TypeError, binascii.Error): 

157 return False 

158 else: 

159 return True 

160 

161 

162PathT = TypeVar("PathT", str, bytes) 

163 

164 

165def hex_to_filename(path: PathT, hex: Union[str, bytes]) -> PathT: 

166 """Takes a hex sha and returns its filename relative to the given path.""" 

167 # os.path.join accepts bytes or unicode, but all args must be of the same 

168 # type. Make sure that hex which is expected to be bytes, is the same type 

169 # as path. 

170 if isinstance(path, str): 

171 if isinstance(hex, bytes): 

172 hex_str = hex.decode("ascii") 

173 else: 

174 hex_str = hex 

175 dir_name = hex_str[:2] 

176 file_name = hex_str[2:] 

177 result = os.path.join(path, dir_name, file_name) 

178 assert isinstance(result, str) 

179 return result 

180 else: 

181 # path is bytes 

182 if isinstance(hex, str): 

183 hex_bytes = hex.encode("ascii") 

184 else: 

185 hex_bytes = hex 

186 dir_name_b = hex_bytes[:2] 

187 file_name_b = hex_bytes[2:] 

188 result_b = os.path.join(path, dir_name_b, file_name_b) 

189 assert isinstance(result_b, bytes) 

190 return result_b 

191 

192 

193def filename_to_hex(filename: Union[str, bytes]) -> str: 

194 """Takes an object filename and returns its corresponding hex sha.""" 

195 # grab the last (up to) two path components 

196 errmsg = f"Invalid object filename: {filename!r}" 

197 if isinstance(filename, str): 

198 names = filename.rsplit(os.path.sep, 2)[-2:] 

199 assert len(names) == 2, errmsg 

200 base, rest = names 

201 assert len(base) == 2 and len(rest) == 38, errmsg 

202 hex_str = base + rest 

203 hex_bytes = hex_str.encode("ascii") 

204 else: 

205 # filename is bytes 

206 sep = ( 

207 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep 

208 ) 

209 names_b = filename.rsplit(sep, 2)[-2:] 

210 assert len(names_b) == 2, errmsg 

211 base_b, rest_b = names_b 

212 assert len(base_b) == 2 and len(rest_b) == 38, errmsg 

213 hex_bytes = base_b + rest_b 

214 hex_to_sha(hex_bytes) 

215 return hex_bytes.decode("ascii") 

216 

217 

218def object_header(num_type: int, length: int) -> bytes: 

219 """Return an object header for the given numeric type and text length.""" 

220 cls = object_class(num_type) 

221 if cls is None: 

222 raise AssertionError(f"unsupported class type num: {num_type}") 

223 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

224 

225 

226def serializable_property(name: str, docstring: Optional[str] = None) -> property: 

227 """A property that helps tracking whether serialization is necessary.""" 

228 

229 def set(obj: "ShaFile", value: object) -> None: 

230 """Set the property value and mark the object as needing serialization. 

231 

232 Args: 

233 obj: The ShaFile object 

234 value: The value to set 

235 """ 

236 setattr(obj, "_" + name, value) 

237 obj._needs_serialization = True 

238 

239 def get(obj: "ShaFile") -> object: 

240 """Get the property value. 

241 

242 Args: 

243 obj: The ShaFile object 

244 

245 Returns: 

246 The property value 

247 """ 

248 return getattr(obj, "_" + name) 

249 

250 return property(get, set, doc=docstring) 

251 

252 

253def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]: 

254 """Get the object class corresponding to the given type. 

255 

256 Args: 

257 type: Either a type name string or a numeric type. 

258 Returns: The ShaFile subclass corresponding to the given type, or None if 

259 type is not a valid type name/number. 

260 """ 

261 return _TYPE_MAP.get(type, None) 

262 

263 

264def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None: 

265 """Check if a string is a valid hex sha string. 

266 

267 Args: 

268 hex: Hex string to check 

269 error_msg: Error message to use in exception 

270 Raises: 

271 ObjectFormatException: Raised when the string is not valid 

272 """ 

273 if not valid_hexsha(hex): 

274 raise ObjectFormatException(f"{error_msg} {hex!r}") 

275 

276 

277def check_identity(identity: Optional[bytes], error_msg: str) -> None: 

278 """Check if the specified identity is valid. 

279 

280 This will raise an exception if the identity is not valid. 

281 

282 Args: 

283 identity: Identity string 

284 error_msg: Error message to use in exception 

285 """ 

286 if identity is None: 

287 raise ObjectFormatException(error_msg) 

288 email_start = identity.find(b"<") 

289 email_end = identity.find(b">") 

290 if not all( 

291 [ 

292 email_start >= 1, 

293 identity[email_start - 1] == b" "[0], 

294 identity.find(b"<", email_start + 1) == -1, 

295 email_end == len(identity) - 1, 

296 b"\0" not in identity, 

297 b"\n" not in identity, 

298 ] 

299 ): 

300 raise ObjectFormatException(error_msg) 

301 

302 

303def _path_to_bytes(path: Union[str, bytes]) -> bytes: 

304 """Convert a path to bytes for use in error messages.""" 

305 if isinstance(path, str): 

306 return path.encode("utf-8", "surrogateescape") 

307 return path 

308 

309 

310def check_time(time_seconds: int) -> None: 

311 """Check if the specified time is not prone to overflow error. 

312 

313 This will raise an exception if the time is not valid. 

314 

315 Args: 

316 time_seconds: time in seconds 

317 

318 """ 

319 # Prevent overflow error 

320 if time_seconds > MAX_TIME: 

321 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

322 

323 

324def git_line(*items: bytes) -> bytes: 

325 """Formats items into a space separated line.""" 

326 return b" ".join(items) + b"\n" 

327 

328 

329class FixedSha: 

330 """SHA object that behaves like hashlib's but is given a fixed value.""" 

331 

332 __slots__ = ("_hexsha", "_sha") 

333 

334 def __init__(self, hexsha: Union[str, bytes]) -> None: 

335 """Initialize FixedSha with a fixed SHA value. 

336 

337 Args: 

338 hexsha: Hex SHA value as string or bytes 

339 """ 

340 if isinstance(hexsha, str): 

341 hexsha = hexsha.encode("ascii") 

342 if not isinstance(hexsha, bytes): 

343 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

344 self._hexsha = hexsha 

345 self._sha = hex_to_sha(hexsha) 

346 

347 def digest(self) -> bytes: 

348 """Return the raw SHA digest.""" 

349 return self._sha 

350 

351 def hexdigest(self) -> str: 

352 """Return the hex SHA digest.""" 

353 return self._hexsha.decode("ascii") 

354 

355 

356# Type guard functions for runtime type narrowing 

357if TYPE_CHECKING: 

358 

359 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

360 """Check if a ShaFile is a Commit.""" 

361 return obj.type_name == b"commit" 

362 

363 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

364 """Check if a ShaFile is a Tree.""" 

365 return obj.type_name == b"tree" 

366 

367 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

368 """Check if a ShaFile is a Blob.""" 

369 return obj.type_name == b"blob" 

370 

371 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

372 """Check if a ShaFile is a Tag.""" 

373 return obj.type_name == b"tag" 

374else: 

375 # Runtime versions without type narrowing 

376 def is_commit(obj: "ShaFile") -> bool: 

377 """Check if a ShaFile is a Commit.""" 

378 return obj.type_name == b"commit" 

379 

380 def is_tree(obj: "ShaFile") -> bool: 

381 """Check if a ShaFile is a Tree.""" 

382 return obj.type_name == b"tree" 

383 

384 def is_blob(obj: "ShaFile") -> bool: 

385 """Check if a ShaFile is a Blob.""" 

386 return obj.type_name == b"blob" 

387 

388 def is_tag(obj: "ShaFile") -> bool: 

389 """Check if a ShaFile is a Tag.""" 

390 return obj.type_name == b"tag" 

391 

392 

393class ShaFile: 

394 """A git SHA file.""" 

395 

396 __slots__ = ("_chunked_text", "_needs_serialization", "_sha") 

397 

398 _needs_serialization: bool 

399 type_name: bytes 

400 type_num: int 

401 _chunked_text: Optional[list[bytes]] 

402 _sha: Union[FixedSha, None, "HASH"] 

403 

404 @staticmethod 

405 def _parse_legacy_object_header( 

406 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

407 ) -> "ShaFile": 

408 """Parse a legacy object, creating it but not reading the file.""" 

409 bufsize = 1024 

410 decomp = zlib.decompressobj() 

411 header = decomp.decompress(magic) 

412 start = 0 

413 end = -1 

414 while end < 0: 

415 extra = f.read(bufsize) 

416 header += decomp.decompress(extra) 

417 magic += extra 

418 end = header.find(b"\0", start) 

419 start = len(header) 

420 header = header[:end] 

421 type_name, size = header.split(b" ", 1) 

422 try: 

423 int(size) # sanity check 

424 except ValueError as exc: 

425 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

426 obj_class = object_class(type_name) 

427 if not obj_class: 

428 raise ObjectFormatException( 

429 "Not a known type: {}".format(type_name.decode("ascii")) 

430 ) 

431 return obj_class() 

432 

433 def _parse_legacy_object(self, map: bytes) -> None: 

434 """Parse a legacy object, setting the raw string.""" 

435 text = _decompress(map) 

436 header_end = text.find(b"\0") 

437 if header_end < 0: 

438 raise ObjectFormatException("Invalid object header, no \\0") 

439 self.set_raw_string(text[header_end + 1 :]) 

440 

441 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

442 """Return chunks representing the object in the experimental format. 

443 

444 Returns: List of strings 

445 """ 

446 compobj = zlib.compressobj(compression_level) 

447 yield compobj.compress(self._header()) 

448 for chunk in self.as_raw_chunks(): 

449 yield compobj.compress(chunk) 

450 yield compobj.flush() 

451 

452 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

453 """Return string representing the object in the experimental format.""" 

454 return b"".join( 

455 self.as_legacy_object_chunks(compression_level=compression_level) 

456 ) 

457 

458 def as_raw_chunks(self) -> list[bytes]: 

459 """Return chunks with serialization of the object. 

460 

461 Returns: List of strings, not necessarily one per line 

462 """ 

463 if self._needs_serialization: 

464 self._sha = None 

465 self._chunked_text = self._serialize() 

466 self._needs_serialization = False 

467 assert self._chunked_text is not None 

468 return self._chunked_text 

469 

470 def as_raw_string(self) -> bytes: 

471 """Return raw string with serialization of the object. 

472 

473 Returns: String object 

474 """ 

475 return b"".join(self.as_raw_chunks()) 

476 

477 def __bytes__(self) -> bytes: 

478 """Return raw string serialization of this object.""" 

479 return self.as_raw_string() 

480 

481 def __hash__(self) -> int: 

482 """Return unique hash for this object.""" 

483 return hash(self.id) 

484 

485 def as_pretty_string(self) -> str: 

486 """Return a string representing this object, fit for display.""" 

487 return self.as_raw_string().decode("utf-8", "replace") 

488 

489 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None: 

490 """Set the contents of this object from a serialized string.""" 

491 if not isinstance(text, bytes): 

492 raise TypeError(f"Expected bytes for text, got {text!r}") 

493 self.set_raw_chunks([text], sha) 

494 

495 def set_raw_chunks( 

496 self, chunks: list[bytes], sha: Optional[ObjectID] = None 

497 ) -> None: 

498 """Set the contents of this object from a list of chunks.""" 

499 self._chunked_text = chunks 

500 self._deserialize(chunks) 

501 if sha is None: 

502 self._sha = None 

503 else: 

504 self._sha = FixedSha(sha) 

505 self._needs_serialization = False 

506 

507 @staticmethod 

508 def _parse_object_header( 

509 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

510 ) -> "ShaFile": 

511 """Parse a new style object, creating it but not reading the file.""" 

512 num_type = (ord(magic[0:1]) >> 4) & 7 

513 obj_class = object_class(num_type) 

514 if not obj_class: 

515 raise ObjectFormatException(f"Not a known type {num_type}") 

516 return obj_class() 

517 

518 def _parse_object(self, map: bytes) -> None: 

519 """Parse a new style object, setting self._text.""" 

520 # skip type and size; type must have already been determined, and 

521 # we trust zlib to fail if it's otherwise corrupted 

522 byte = ord(map[0:1]) 

523 used = 1 

524 while (byte & 0x80) != 0: 

525 byte = ord(map[used : used + 1]) 

526 used += 1 

527 raw = map[used:] 

528 self.set_raw_string(_decompress(raw)) 

529 

530 @classmethod 

531 def _is_legacy_object(cls, magic: bytes) -> bool: 

532 b0 = ord(magic[0:1]) 

533 b1 = ord(magic[1:2]) 

534 word = (b0 << 8) + b1 

535 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

536 

537 @classmethod 

538 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

539 map = f.read() 

540 if not map: 

541 raise EmptyFileException("Corrupted empty file detected") 

542 

543 if cls._is_legacy_object(map): 

544 obj = cls._parse_legacy_object_header(map, f) 

545 obj._parse_legacy_object(map) 

546 else: 

547 obj = cls._parse_object_header(map, f) 

548 obj._parse_object(map) 

549 return obj 

550 

551 def __init__(self) -> None: 

552 """Don't call this directly.""" 

553 self._sha = None 

554 self._chunked_text = [] 

555 self._needs_serialization = True 

556 

557 def _deserialize(self, chunks: list[bytes]) -> None: 

558 raise NotImplementedError(self._deserialize) 

559 

560 def _serialize(self) -> list[bytes]: 

561 raise NotImplementedError(self._serialize) 

562 

563 @classmethod 

564 def from_path(cls, path: Union[str, bytes]) -> "ShaFile": 

565 """Open a SHA file from disk.""" 

566 with GitFile(path, "rb") as f: 

567 return cls.from_file(f) 

568 

569 @classmethod 

570 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

571 """Get the contents of a SHA file on disk.""" 

572 try: 

573 obj = cls._parse_file(f) 

574 obj._sha = None 

575 return obj 

576 except (IndexError, ValueError) as exc: 

577 raise ObjectFormatException("invalid object header") from exc 

578 

579 @staticmethod 

580 def from_raw_string( 

581 type_num: int, string: bytes, sha: Optional[ObjectID] = None 

582 ) -> "ShaFile": 

583 """Creates an object of the indicated type from the raw string given. 

584 

585 Args: 

586 type_num: The numeric type of the object. 

587 string: The raw uncompressed contents. 

588 sha: Optional known sha for the object 

589 """ 

590 cls = object_class(type_num) 

591 if cls is None: 

592 raise AssertionError(f"unsupported class type num: {type_num}") 

593 obj = cls() 

594 obj.set_raw_string(string, sha) 

595 return obj 

596 

597 @staticmethod 

598 def from_raw_chunks( 

599 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None 

600 ) -> "ShaFile": 

601 """Creates an object of the indicated type from the raw chunks given. 

602 

603 Args: 

604 type_num: The numeric type of the object. 

605 chunks: An iterable of the raw uncompressed contents. 

606 sha: Optional known sha for the object 

607 """ 

608 cls = object_class(type_num) 

609 if cls is None: 

610 raise AssertionError(f"unsupported class type num: {type_num}") 

611 obj = cls() 

612 obj.set_raw_chunks(chunks, sha) 

613 return obj 

614 

615 @classmethod 

616 def from_string(cls, string: bytes) -> Self: 

617 """Create a ShaFile from a string.""" 

618 obj = cls() 

619 obj.set_raw_string(string) 

620 return obj 

621 

622 def _check_has_member(self, member: str, error_msg: str) -> None: 

623 """Check that the object has a given member variable. 

624 

625 Args: 

626 member: the member variable to check for 

627 error_msg: the message for an error if the member is missing 

628 Raises: 

629 ObjectFormatException: with the given error_msg if member is 

630 missing or is None 

631 """ 

632 if getattr(self, member, None) is None: 

633 raise ObjectFormatException(error_msg) 

634 

635 def check(self) -> None: 

636 """Check this object for internal consistency. 

637 

638 Raises: 

639 ObjectFormatException: if the object is malformed in some way 

640 ChecksumMismatch: if the object was created with a SHA that does 

641 not match its contents 

642 """ 

643 # TODO: if we find that error-checking during object parsing is a 

644 # performance bottleneck, those checks should be moved to the class's 

645 # check() method during optimization so we can still check the object 

646 # when necessary. 

647 old_sha = self.id 

648 try: 

649 self._deserialize(self.as_raw_chunks()) 

650 self._sha = None 

651 new_sha = self.id 

652 except Exception as exc: 

653 raise ObjectFormatException(exc) from exc 

654 if old_sha != new_sha: 

655 raise ChecksumMismatch(new_sha, old_sha) 

656 

657 def _header(self) -> bytes: 

658 return object_header(self.type_num, self.raw_length()) 

659 

660 def raw_length(self) -> int: 

661 """Returns the length of the raw string of this object.""" 

662 return sum(map(len, self.as_raw_chunks())) 

663 

664 def sha(self) -> Union[FixedSha, "HASH"]: 

665 """The SHA1 object that is the name of this object.""" 

666 if self._sha is None or self._needs_serialization: 

667 # this is a local because as_raw_chunks() overwrites self._sha 

668 new_sha = sha1() 

669 new_sha.update(self._header()) 

670 for chunk in self.as_raw_chunks(): 

671 new_sha.update(chunk) 

672 self._sha = new_sha 

673 return self._sha 

674 

675 def copy(self) -> "ShaFile": 

676 """Create a new copy of this SHA1 object from its raw string.""" 

677 obj_class = object_class(self.type_num) 

678 if obj_class is None: 

679 raise AssertionError(f"invalid type num {self.type_num}") 

680 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

681 

682 @property 

683 def id(self) -> bytes: 

684 """The hex SHA of this object.""" 

685 return self.sha().hexdigest().encode("ascii") 

686 

687 def __repr__(self) -> str: 

688 """Return string representation of this object.""" 

689 return f"<{self.__class__.__name__} {self.id!r}>" 

690 

691 def __ne__(self, other: object) -> bool: 

692 """Check whether this object does not match the other.""" 

693 return not isinstance(other, ShaFile) or self.id != other.id 

694 

695 def __eq__(self, other: object) -> bool: 

696 """Return True if the SHAs of the two objects match.""" 

697 return isinstance(other, ShaFile) and self.id == other.id 

698 

699 def __lt__(self, other: object) -> bool: 

700 """Return whether SHA of this object is less than the other.""" 

701 if not isinstance(other, ShaFile): 

702 raise TypeError 

703 return self.id < other.id 

704 

705 def __le__(self, other: object) -> bool: 

706 """Check whether SHA of this object is less than or equal to the other.""" 

707 if not isinstance(other, ShaFile): 

708 raise TypeError 

709 return self.id <= other.id 

710 

711 

712class Blob(ShaFile): 

713 """A Git Blob object.""" 

714 

715 __slots__ = () 

716 

717 type_name = b"blob" 

718 type_num = 3 

719 

720 _chunked_text: list[bytes] 

721 

722 def __init__(self) -> None: 

723 """Initialize a new Blob object.""" 

724 super().__init__() 

725 self._chunked_text = [] 

726 self._needs_serialization = False 

727 

728 def _get_data(self) -> bytes: 

729 return self.as_raw_string() 

730 

731 def _set_data(self, data: bytes) -> None: 

732 self.set_raw_string(data) 

733 

734 data = property( 

735 _get_data, _set_data, doc="The text contained within the blob object." 

736 ) 

737 

738 def _get_chunked(self) -> list[bytes]: 

739 return self._chunked_text 

740 

741 def _set_chunked(self, chunks: list[bytes]) -> None: 

742 self._chunked_text = chunks 

743 

744 def _serialize(self) -> list[bytes]: 

745 return self._chunked_text 

746 

747 def _deserialize(self, chunks: list[bytes]) -> None: 

748 self._chunked_text = chunks 

749 

750 chunked = property( 

751 _get_chunked, 

752 _set_chunked, 

753 doc="The text in the blob object, as chunks (not necessarily lines)", 

754 ) 

755 

756 @classmethod 

757 def from_path(cls, path: Union[str, bytes]) -> "Blob": 

758 """Read a blob from a file on disk. 

759 

760 Args: 

761 path: Path to the blob file 

762 

763 Returns: 

764 A Blob object 

765 

766 Raises: 

767 NotBlobError: If the file is not a blob 

768 """ 

769 blob = ShaFile.from_path(path) 

770 if not isinstance(blob, cls): 

771 raise NotBlobError(_path_to_bytes(path)) 

772 return blob 

773 

774 def check(self) -> None: 

775 """Check this object for internal consistency. 

776 

777 Raises: 

778 ObjectFormatException: if the object is malformed in some way 

779 """ 

780 super().check() 

781 

782 def splitlines(self) -> list[bytes]: 

783 """Return list of lines in this blob. 

784 

785 This preserves the original line endings. 

786 """ 

787 chunks = self.chunked 

788 if not chunks: 

789 return [] 

790 if len(chunks) == 1: 

791 result: list[bytes] = chunks[0].splitlines(True) 

792 return result 

793 remaining = None 

794 ret = [] 

795 for chunk in chunks: 

796 lines = chunk.splitlines(True) 

797 if len(lines) > 1: 

798 ret.append((remaining or b"") + lines[0]) 

799 ret.extend(lines[1:-1]) 

800 remaining = lines[-1] 

801 elif len(lines) == 1: 

802 if remaining is None: 

803 remaining = lines.pop() 

804 else: 

805 remaining += lines.pop() 

806 if remaining is not None: 

807 ret.append(remaining) 

808 return ret 

809 

810 

811def _parse_message( 

812 chunks: Iterable[bytes], 

813) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]: 

814 """Parse a message with a list of fields and a body. 

815 

816 Args: 

817 chunks: the raw chunks of the tag or commit object. 

818 Returns: iterator of tuples of (field, value), one per header line, in the 

819 order read from the text, possibly including duplicates. Includes a 

820 field named None for the freeform tag/commit text. 

821 """ 

822 f = BytesIO(b"".join(chunks)) 

823 k = None 

824 v = b"" 

825 eof = False 

826 

827 def _strip_last_newline(value: bytes) -> bytes: 

828 """Strip the last newline from value.""" 

829 if value and value.endswith(b"\n"): 

830 return value[:-1] 

831 return value 

832 

833 # Parse the headers 

834 # 

835 # Headers can contain newlines. The next line is indented with a space. 

836 # We store the latest key as 'k', and the accumulated value as 'v'. 

837 for line in f: 

838 if line.startswith(b" "): 

839 # Indented continuation of the previous line 

840 v += line[1:] 

841 else: 

842 if k is not None: 

843 # We parsed a new header, return its value 

844 yield (k, _strip_last_newline(v)) 

845 if line == b"\n": 

846 # Empty line indicates end of headers 

847 break 

848 (k, v) = line.split(b" ", 1) 

849 

850 else: 

851 # We reached end of file before the headers ended. We still need to 

852 # return the previous header, then we need to return a None field for 

853 # the text. 

854 eof = True 

855 if k is not None: 

856 yield (k, _strip_last_newline(v)) 

857 yield (None, None) 

858 

859 if not eof: 

860 # We didn't reach the end of file while parsing headers. We can return 

861 # the rest of the file as a message. 

862 yield (None, f.read()) 

863 

864 f.close() 

865 

866 

867def _format_message( 

868 headers: Sequence[tuple[bytes, bytes]], body: Optional[bytes] 

869) -> Iterator[bytes]: 

870 for field, value in headers: 

871 lines = value.split(b"\n") 

872 yield git_line(field, lines[0]) 

873 for line in lines[1:]: 

874 yield b" " + line + b"\n" 

875 yield b"\n" # There must be a new line after the headers 

876 if body: 

877 yield body 

878 

879 

880class Tag(ShaFile): 

881 """A Git Tag object.""" 

882 

883 type_name = b"tag" 

884 type_num = 4 

885 

886 __slots__ = ( 

887 "_message", 

888 "_name", 

889 "_object_class", 

890 "_object_sha", 

891 "_signature", 

892 "_tag_time", 

893 "_tag_timezone", 

894 "_tag_timezone_neg_utc", 

895 "_tagger", 

896 ) 

897 

898 _message: Optional[bytes] 

899 _name: Optional[bytes] 

900 _object_class: Optional[type["ShaFile"]] 

901 _object_sha: Optional[bytes] 

902 _signature: Optional[bytes] 

903 _tag_time: Optional[int] 

904 _tag_timezone: Optional[int] 

905 _tag_timezone_neg_utc: Optional[bool] 

906 _tagger: Optional[bytes] 

907 

908 def __init__(self) -> None: 

909 """Initialize a new Tag object.""" 

910 super().__init__() 

911 self._tagger = None 

912 self._tag_time = None 

913 self._tag_timezone = None 

914 self._tag_timezone_neg_utc = False 

915 self._signature: Optional[bytes] = None 

916 

917 @classmethod 

918 def from_path(cls, filename: Union[str, bytes]) -> "Tag": 

919 """Read a tag from a file on disk. 

920 

921 Args: 

922 filename: Path to the tag file 

923 

924 Returns: 

925 A Tag object 

926 

927 Raises: 

928 NotTagError: If the file is not a tag 

929 """ 

930 tag = ShaFile.from_path(filename) 

931 if not isinstance(tag, cls): 

932 raise NotTagError(_path_to_bytes(filename)) 

933 return tag 

934 

935 def check(self) -> None: 

936 """Check this object for internal consistency. 

937 

938 Raises: 

939 ObjectFormatException: if the object is malformed in some way 

940 """ 

941 super().check() 

942 assert self._chunked_text is not None 

943 self._check_has_member("_object_sha", "missing object sha") 

944 self._check_has_member("_object_class", "missing object type") 

945 self._check_has_member("_name", "missing tag name") 

946 

947 if not self._name: 

948 raise ObjectFormatException("empty tag name") 

949 

950 if self._object_sha is None: 

951 raise ObjectFormatException("missing object sha") 

952 check_hexsha(self._object_sha, "invalid object sha") 

953 

954 if self._tagger is not None: 

955 check_identity(self._tagger, "invalid tagger") 

956 

957 self._check_has_member("_tag_time", "missing tag time") 

958 if self._tag_time is None: 

959 raise ObjectFormatException("missing tag time") 

960 check_time(self._tag_time) 

961 

962 last = None 

963 for field, _ in _parse_message(self._chunked_text): 

964 if field == _OBJECT_HEADER and last is not None: 

965 raise ObjectFormatException("unexpected object") 

966 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

967 raise ObjectFormatException("unexpected type") 

968 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

969 raise ObjectFormatException("unexpected tag name") 

970 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

971 raise ObjectFormatException("unexpected tagger") 

972 last = field 

973 

974 def _serialize(self) -> list[bytes]: 

975 headers = [] 

976 if self._object_sha is None: 

977 raise ObjectFormatException("missing object sha") 

978 headers.append((_OBJECT_HEADER, self._object_sha)) 

979 if self._object_class is None: 

980 raise ObjectFormatException("missing object class") 

981 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

982 if self._name is None: 

983 raise ObjectFormatException("missing tag name") 

984 headers.append((_TAG_HEADER, self._name)) 

985 if self._tagger: 

986 if self._tag_time is None: 

987 headers.append((_TAGGER_HEADER, self._tagger)) 

988 else: 

989 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

990 raise ObjectFormatException("missing timezone info") 

991 headers.append( 

992 ( 

993 _TAGGER_HEADER, 

994 format_time_entry( 

995 self._tagger, 

996 self._tag_time, 

997 (self._tag_timezone, self._tag_timezone_neg_utc), 

998 ), 

999 ) 

1000 ) 

1001 

1002 if self.message is None and self._signature is None: 

1003 body = None 

1004 else: 

1005 body = (self.message or b"") + (self._signature or b"") 

1006 return list(_format_message(headers, body)) 

1007 

1008 def _deserialize(self, chunks: list[bytes]) -> None: 

1009 """Grab the metadata attached to the tag.""" 

1010 self._tagger = None 

1011 self._tag_time = None 

1012 self._tag_timezone = None 

1013 self._tag_timezone_neg_utc = False 

1014 for field, value in _parse_message(chunks): 

1015 if field == _OBJECT_HEADER: 

1016 self._object_sha = value 

1017 elif field == _TYPE_HEADER: 

1018 assert isinstance(value, bytes) 

1019 obj_class = object_class(value) 

1020 if not obj_class: 

1021 raise ObjectFormatException(f"Not a known type: {value!r}") 

1022 self._object_class = obj_class 

1023 elif field == _TAG_HEADER: 

1024 self._name = value 

1025 elif field == _TAGGER_HEADER: 

1026 if value is None: 

1027 raise ObjectFormatException("missing tagger value") 

1028 ( 

1029 self._tagger, 

1030 self._tag_time, 

1031 (self._tag_timezone, self._tag_timezone_neg_utc), 

1032 ) = parse_time_entry(value) 

1033 elif field is None: 

1034 if value is None: 

1035 self._message = None 

1036 self._signature = None 

1037 else: 

1038 # Try to find either PGP or SSH signature 

1039 sig_idx = None 

1040 try: 

1041 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

1042 except ValueError: 

1043 try: 

1044 sig_idx = value.index(BEGIN_SSH_SIGNATURE) 

1045 except ValueError: 

1046 pass 

1047 

1048 if sig_idx is not None: 

1049 self._message = value[:sig_idx] 

1050 self._signature = value[sig_idx:] 

1051 else: 

1052 self._message = value 

1053 self._signature = None 

1054 else: 

1055 raise ObjectFormatException( 

1056 f"Unknown field {field.decode('ascii', 'replace')}" 

1057 ) 

1058 

1059 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

1060 """Get the object pointed to by this tag. 

1061 

1062 Returns: tuple of (object class, sha). 

1063 """ 

1064 if self._object_class is None or self._object_sha is None: 

1065 raise ValueError("Tag object is not properly initialized") 

1066 return (self._object_class, self._object_sha) 

1067 

1068 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

1069 (self._object_class, self._object_sha) = value 

1070 self._needs_serialization = True 

1071 

1072 object = property(_get_object, _set_object) 

1073 

1074 name = serializable_property("name", "The name of this tag") 

1075 tagger = serializable_property( 

1076 "tagger", "Returns the name of the person who created this tag" 

1077 ) 

1078 tag_time = serializable_property( 

1079 "tag_time", 

1080 "The creation timestamp of the tag. As the number of seconds since the epoch", 

1081 ) 

1082 tag_timezone = serializable_property( 

1083 "tag_timezone", "The timezone that tag_time is in." 

1084 ) 

1085 message = serializable_property("message", "the message attached to this tag") 

1086 

1087 signature = serializable_property("signature", "Optional detached GPG signature") 

1088 

1089 def sign(self, keyid: Optional[str] = None) -> None: 

1090 """Sign this tag with a GPG key. 

1091 

1092 Args: 

1093 keyid: Optional GPG key ID to use for signing. If not specified, 

1094 the default GPG key will be used. 

1095 """ 

1096 import gpg 

1097 

1098 with gpg.Context(armor=True) as c: 

1099 if keyid is not None: 

1100 key = c.get_key(keyid) 

1101 with gpg.Context(armor=True, signers=[key]) as ctx: 

1102 self.signature, _unused_result = ctx.sign( 

1103 self.as_raw_string(), 

1104 mode=gpg.constants.sig.mode.DETACH, 

1105 ) 

1106 else: 

1107 self.signature, _unused_result = c.sign( 

1108 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1109 ) 

1110 

1111 def raw_without_sig(self) -> bytes: 

1112 """Return raw string serialization without the GPG/SSH signature. 

1113 

1114 self.signature is a signature for the returned raw byte string serialization. 

1115 """ 

1116 ret = self.as_raw_string() 

1117 if self._signature: 

1118 ret = ret[: -len(self._signature)] 

1119 return ret 

1120 

1121 def extract_signature(self) -> tuple[bytes, Optional[bytes], Optional[bytes]]: 

1122 """Extract the payload, signature, and signature type from this tag. 

1123 

1124 Returns: 

1125 Tuple of (``payload``, ``signature``, ``signature_type``) where: 

1126 

1127 - ``payload``: The raw tag data without the signature 

1128 - ``signature``: The signature bytes if present, None otherwise 

1129 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1130 

1131 Raises: 

1132 ObjectFormatException: If signature has unknown format 

1133 """ 

1134 if self._signature is None: 

1135 return self.as_raw_string(), None, None 

1136 

1137 payload = self.raw_without_sig() 

1138 

1139 # Determine signature type 

1140 if self._signature.startswith(BEGIN_PGP_SIGNATURE): 

1141 sig_type = SIGNATURE_PGP 

1142 elif self._signature.startswith(BEGIN_SSH_SIGNATURE): 

1143 sig_type = SIGNATURE_SSH 

1144 else: 

1145 raise ObjectFormatException("Unknown signature format") 

1146 

1147 return payload, self._signature, sig_type 

1148 

1149 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1150 """Verify GPG signature for this tag (if it is signed). 

1151 

1152 Args: 

1153 keyids: Optional iterable of trusted keyids for this tag. 

1154 If this tag is not signed by any key in keyids verification will 

1155 fail. If not specified, this function only verifies that the tag 

1156 has a valid signature. 

1157 

1158 Raises: 

1159 gpg.errors.BadSignatures: if GPG signature verification fails 

1160 gpg.errors.MissingSignatures: if tag was not signed by a key 

1161 specified in keyids 

1162 """ 

1163 if self._signature is None: 

1164 return 

1165 

1166 import gpg 

1167 

1168 with gpg.Context() as ctx: 

1169 data, result = ctx.verify( 

1170 self.raw_without_sig(), 

1171 signature=self._signature, 

1172 ) 

1173 if keyids: 

1174 keys = [ctx.get_key(key) for key in keyids] 

1175 for key in keys: 

1176 for subkey in key.subkeys: 

1177 for sig in result.signatures: 

1178 if subkey.can_sign and subkey.fpr == sig.fpr: 

1179 return 

1180 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1181 

1182 

1183class TreeEntry(NamedTuple): 

1184 """Named tuple encapsulating a single tree entry.""" 

1185 

1186 path: bytes 

1187 mode: int 

1188 sha: bytes 

1189 

1190 def in_path(self, path: bytes) -> "TreeEntry": 

1191 """Return a copy of this entry with the given path prepended.""" 

1192 if not isinstance(self.path, bytes): 

1193 raise TypeError(f"Expected bytes for path, got {path!r}") 

1194 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1195 

1196 

1197def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]: 

1198 """Parse a tree text. 

1199 

1200 Args: 

1201 text: Serialized text to parse 

1202 strict: If True, enforce strict validation 

1203 Returns: iterator of tuples of (name, mode, sha) 

1204 

1205 Raises: 

1206 ObjectFormatException: if the object was malformed in some way 

1207 """ 

1208 count = 0 

1209 length = len(text) 

1210 while count < length: 

1211 mode_end = text.index(b" ", count) 

1212 mode_text = text[count:mode_end] 

1213 if strict and mode_text.startswith(b"0"): 

1214 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1215 try: 

1216 mode = int(mode_text, 8) 

1217 except ValueError as exc: 

1218 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1219 name_end = text.index(b"\0", mode_end) 

1220 name = text[mode_end + 1 : name_end] 

1221 count = name_end + 21 

1222 sha = text[name_end + 1 : count] 

1223 if len(sha) != 20: 

1224 raise ObjectFormatException("Sha has invalid length") 

1225 hexsha = sha_to_hex(sha) 

1226 yield (name, mode, hexsha) 

1227 

1228 

1229def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]: 

1230 """Serialize the items in a tree to a text. 

1231 

1232 Args: 

1233 items: Sorted iterable over (name, mode, sha) tuples 

1234 Returns: Serialized tree text as chunks 

1235 """ 

1236 for name, mode, hexsha in items: 

1237 yield ( 

1238 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1239 ) 

1240 

1241 

1242def sorted_tree_items( 

1243 entries: dict[bytes, tuple[int, bytes]], name_order: bool 

1244) -> Iterator[TreeEntry]: 

1245 """Iterate over a tree entries dictionary. 

1246 

1247 Args: 

1248 name_order: If True, iterate entries in order of their name. If 

1249 False, iterate entries in tree order, that is, treat subtree entries as 

1250 having '/' appended. 

1251 entries: Dictionary mapping names to (mode, sha) tuples 

1252 Returns: Iterator over (name, mode, hexsha) 

1253 """ 

1254 if name_order: 

1255 key_func = key_entry_name_order 

1256 else: 

1257 key_func = key_entry 

1258 for name, entry in sorted(entries.items(), key=key_func): 

1259 mode, hexsha = entry 

1260 # Stricter type checks than normal to mirror checks in the Rust version. 

1261 mode = int(mode) 

1262 if not isinstance(hexsha, bytes): 

1263 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1264 yield TreeEntry(name, mode, hexsha) 

1265 

1266 

1267def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1268 """Sort key for tree entry. 

1269 

1270 Args: 

1271 entry: (name, value) tuple 

1272 """ 

1273 (name, (mode, _sha)) = entry 

1274 if stat.S_ISDIR(mode): 

1275 name += b"/" 

1276 return name 

1277 

1278 

1279def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1280 """Sort key for tree entry in name order.""" 

1281 return entry[0] 

1282 

1283 

1284def pretty_format_tree_entry( 

1285 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8" 

1286) -> str: 

1287 """Pretty format tree entry. 

1288 

1289 Args: 

1290 name: Name of the directory entry 

1291 mode: Mode of entry 

1292 hexsha: Hexsha of the referenced object 

1293 encoding: Character encoding for the name 

1294 Returns: string describing the tree entry 

1295 """ 

1296 if mode & stat.S_IFDIR: 

1297 kind = "tree" 

1298 else: 

1299 kind = "blob" 

1300 return "{:04o} {} {}\t{}\n".format( 

1301 mode, 

1302 kind, 

1303 hexsha.decode("ascii"), 

1304 name.decode(encoding, "replace"), 

1305 ) 

1306 

1307 

1308class SubmoduleEncountered(Exception): 

1309 """A submodule was encountered while resolving a path.""" 

1310 

1311 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1312 """Initialize SubmoduleEncountered exception. 

1313 

1314 Args: 

1315 path: Path where the submodule was encountered 

1316 sha: SHA of the submodule 

1317 """ 

1318 self.path = path 

1319 self.sha = sha 

1320 

1321 

1322class Tree(ShaFile): 

1323 """A Git tree object.""" 

1324 

1325 type_name = b"tree" 

1326 type_num = 2 

1327 

1328 __slots__ = "_entries" 

1329 

1330 def __init__(self) -> None: 

1331 """Initialize an empty Tree.""" 

1332 super().__init__() 

1333 self._entries: dict[bytes, tuple[int, bytes]] = {} 

1334 

1335 @classmethod 

1336 def from_path(cls, filename: Union[str, bytes]) -> "Tree": 

1337 """Read a tree from a file on disk. 

1338 

1339 Args: 

1340 filename: Path to the tree file 

1341 

1342 Returns: 

1343 A Tree object 

1344 

1345 Raises: 

1346 NotTreeError: If the file is not a tree 

1347 """ 

1348 tree = ShaFile.from_path(filename) 

1349 if not isinstance(tree, cls): 

1350 raise NotTreeError(_path_to_bytes(filename)) 

1351 return tree 

1352 

1353 def __contains__(self, name: bytes) -> bool: 

1354 """Check if name exists in tree.""" 

1355 return name in self._entries 

1356 

1357 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1358 """Get tree entry by name.""" 

1359 return self._entries[name] 

1360 

1361 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1362 """Set a tree entry by name. 

1363 

1364 Args: 

1365 name: The name of the entry, as a string. 

1366 value: A tuple of (mode, hexsha), where mode is the mode of the 

1367 entry as an integral type and hexsha is the hex SHA of the entry as 

1368 a string. 

1369 """ 

1370 mode, hexsha = value 

1371 self._entries[name] = (mode, hexsha) 

1372 self._needs_serialization = True 

1373 

1374 def __delitem__(self, name: bytes) -> None: 

1375 """Delete tree entry by name.""" 

1376 del self._entries[name] 

1377 self._needs_serialization = True 

1378 

1379 def __len__(self) -> int: 

1380 """Return number of entries in tree.""" 

1381 return len(self._entries) 

1382 

1383 def __iter__(self) -> Iterator[bytes]: 

1384 """Iterate over tree entry names.""" 

1385 return iter(self._entries) 

1386 

1387 def add(self, name: bytes, mode: int, hexsha: bytes) -> None: 

1388 """Add an entry to the tree. 

1389 

1390 Args: 

1391 mode: The mode of the entry as an integral type. Not all 

1392 possible modes are supported by git; see check() for details. 

1393 name: The name of the entry, as a string. 

1394 hexsha: The hex SHA of the entry as a string. 

1395 """ 

1396 self._entries[name] = mode, hexsha 

1397 self._needs_serialization = True 

1398 

1399 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1400 """Iterate over entries. 

1401 

1402 Args: 

1403 name_order: If True, iterate in name order instead of tree 

1404 order. 

1405 Returns: Iterator over (name, mode, sha) tuples 

1406 """ 

1407 return sorted_tree_items(self._entries, name_order) 

1408 

1409 def items(self) -> list[TreeEntry]: 

1410 """Return the sorted entries in this tree. 

1411 

1412 Returns: List with (name, mode, sha) tuples 

1413 """ 

1414 return list(self.iteritems()) 

1415 

1416 def _deserialize(self, chunks: list[bytes]) -> None: 

1417 """Grab the entries in the tree.""" 

1418 try: 

1419 parsed_entries = parse_tree(b"".join(chunks)) 

1420 except ValueError as exc: 

1421 raise ObjectFormatException(exc) from exc 

1422 # TODO: list comprehension is for efficiency in the common (small) 

1423 # case; if memory efficiency in the large case is a concern, use a 

1424 # genexp. 

1425 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1426 

1427 def check(self) -> None: 

1428 """Check this object for internal consistency. 

1429 

1430 Raises: 

1431 ObjectFormatException: if the object is malformed in some way 

1432 """ 

1433 super().check() 

1434 assert self._chunked_text is not None 

1435 last = None 

1436 allowed_modes = ( 

1437 stat.S_IFREG | 0o755, 

1438 stat.S_IFREG | 0o644, 

1439 stat.S_IFLNK, 

1440 stat.S_IFDIR, 

1441 S_IFGITLINK, 

1442 # TODO: optionally exclude as in git fsck --strict 

1443 stat.S_IFREG | 0o664, 

1444 ) 

1445 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1446 check_hexsha(sha, f"invalid sha {sha!r}") 

1447 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1448 raise ObjectFormatException( 

1449 "invalid name {}".format(name.decode("utf-8", "replace")) 

1450 ) 

1451 

1452 if mode not in allowed_modes: 

1453 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1454 

1455 entry = (name, (mode, sha)) 

1456 if last: 

1457 if key_entry(last) > key_entry(entry): 

1458 raise ObjectFormatException("entries not sorted") 

1459 if name == last[0]: 

1460 raise ObjectFormatException(f"duplicate entry {name!r}") 

1461 last = entry 

1462 

1463 def _serialize(self) -> list[bytes]: 

1464 return list(serialize_tree(self.iteritems())) 

1465 

1466 def as_pretty_string(self) -> str: 

1467 """Return a human-readable string representation of this tree. 

1468 

1469 Returns: 

1470 Pretty-printed tree entries 

1471 """ 

1472 text: list[str] = [] 

1473 for entry in self.iteritems(): 

1474 if ( 

1475 entry.path is not None 

1476 and entry.mode is not None 

1477 and entry.sha is not None 

1478 ): 

1479 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha)) 

1480 return "".join(text) 

1481 

1482 def lookup_path( 

1483 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1484 ) -> tuple[int, ObjectID]: 

1485 """Look up an object in a Git tree. 

1486 

1487 Args: 

1488 lookup_obj: Callback for retrieving object by SHA1 

1489 path: Path to lookup 

1490 Returns: A tuple of (mode, SHA) of the resulting path. 

1491 """ 

1492 # Handle empty path - return the tree itself 

1493 if not path: 

1494 return stat.S_IFDIR, self.id 

1495 

1496 parts = path.split(b"/") 

1497 sha = self.id 

1498 mode: Optional[int] = None 

1499 for i, p in enumerate(parts): 

1500 if not p: 

1501 continue 

1502 if mode is not None and S_ISGITLINK(mode): 

1503 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1504 obj = lookup_obj(sha) 

1505 if not isinstance(obj, Tree): 

1506 raise NotTreeError(sha) 

1507 mode, sha = obj[p] 

1508 if mode is None: 

1509 raise ValueError("No valid path found") 

1510 return mode, sha 

1511 

1512 

1513def parse_timezone(text: bytes) -> tuple[int, bool]: 

1514 """Parse a timezone text fragment (e.g. '+0100'). 

1515 

1516 Args: 

1517 text: Text to parse. 

1518 Returns: Tuple with timezone as seconds difference to UTC 

1519 and a boolean indicating whether this was a UTC timezone 

1520 prefixed with a negative sign (-0000). 

1521 """ 

1522 # cgit parses the first character as the sign, and the rest 

1523 # as an integer (using strtol), which could also be negative. 

1524 # We do the same for compatibility. See #697828. 

1525 if text[0] not in b"+-": 

1526 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1527 sign = text[:1] 

1528 offset = int(text[1:]) 

1529 if sign == b"-": 

1530 offset = -offset 

1531 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1532 signum = ((offset < 0) and -1) or 1 

1533 offset = abs(offset) 

1534 hours = int(offset / 100) 

1535 minutes = offset % 100 

1536 return ( 

1537 signum * (hours * 3600 + minutes * 60), 

1538 unnecessary_negative_timezone, 

1539 ) 

1540 

1541 

1542def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1543 """Format a timezone for Git serialization. 

1544 

1545 Args: 

1546 offset: Timezone offset as seconds difference to UTC 

1547 unnecessary_negative_timezone: Whether to use a minus sign for 

1548 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1549 """ 

1550 if offset % 60 != 0: 

1551 raise ValueError("Unable to handle non-minute offset.") 

1552 if offset < 0 or unnecessary_negative_timezone: 

1553 sign = "-" 

1554 offset = -offset 

1555 else: 

1556 sign = "+" 

1557 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1558 

1559 

1560def parse_time_entry( 

1561 value: bytes, 

1562) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]: 

1563 """Parse event. 

1564 

1565 Args: 

1566 value: Bytes representing a git commit/tag line 

1567 Raises: 

1568 ObjectFormatException in case of parsing error (malformed 

1569 field date) 

1570 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1571 """ 

1572 try: 

1573 sep = value.rindex(b"> ") 

1574 except ValueError: 

1575 return (value, None, (None, False)) 

1576 try: 

1577 person = value[0 : sep + 1] 

1578 rest = value[sep + 2 :] 

1579 timetext, timezonetext = rest.rsplit(b" ", 1) 

1580 time = int(timetext) 

1581 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1582 except ValueError as exc: 

1583 raise ObjectFormatException(exc) from exc 

1584 return person, time, (timezone, timezone_neg_utc) 

1585 

1586 

1587def format_time_entry( 

1588 person: bytes, time: int, timezone_info: tuple[int, bool] 

1589) -> bytes: 

1590 """Format an event.""" 

1591 (timezone, timezone_neg_utc) = timezone_info 

1592 return b" ".join( 

1593 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1594 ) 

1595 

1596 

1597@replace_me(since="0.21.0", remove_in="0.24.0") 

1598def parse_commit( 

1599 chunks: Iterable[bytes], 

1600) -> tuple[ 

1601 Optional[bytes], 

1602 list[bytes], 

1603 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1604 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1605 Optional[bytes], 

1606 list[Tag], 

1607 Optional[bytes], 

1608 Optional[bytes], 

1609 list[tuple[bytes, bytes]], 

1610]: 

1611 """Parse a commit object from chunks. 

1612 

1613 Args: 

1614 chunks: Chunks to parse 

1615 Returns: Tuple of (tree, parents, author_info, commit_info, 

1616 encoding, mergetag, gpgsig, message, extra) 

1617 """ 

1618 parents = [] 

1619 extra = [] 

1620 tree = None 

1621 author_info: tuple[ 

1622 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1623 ] = (None, None, (None, None)) 

1624 commit_info: tuple[ 

1625 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1626 ] = (None, None, (None, None)) 

1627 encoding = None 

1628 mergetag = [] 

1629 message = None 

1630 gpgsig = None 

1631 

1632 for field, value in _parse_message(chunks): 

1633 # TODO(jelmer): Enforce ordering 

1634 if field == _TREE_HEADER: 

1635 tree = value 

1636 elif field == _PARENT_HEADER: 

1637 if value is None: 

1638 raise ObjectFormatException("missing parent value") 

1639 parents.append(value) 

1640 elif field == _AUTHOR_HEADER: 

1641 if value is None: 

1642 raise ObjectFormatException("missing author value") 

1643 author_info = parse_time_entry(value) 

1644 elif field == _COMMITTER_HEADER: 

1645 if value is None: 

1646 raise ObjectFormatException("missing committer value") 

1647 commit_info = parse_time_entry(value) 

1648 elif field == _ENCODING_HEADER: 

1649 encoding = value 

1650 elif field == _MERGETAG_HEADER: 

1651 if value is None: 

1652 raise ObjectFormatException("missing mergetag value") 

1653 tag = Tag.from_string(value + b"\n") 

1654 assert isinstance(tag, Tag) 

1655 mergetag.append(tag) 

1656 elif field == _GPGSIG_HEADER: 

1657 gpgsig = value 

1658 elif field is None: 

1659 message = value 

1660 else: 

1661 if value is None: 

1662 raise ObjectFormatException(f"missing value for field {field!r}") 

1663 extra.append((field, value)) 

1664 return ( 

1665 tree, 

1666 parents, 

1667 author_info, 

1668 commit_info, 

1669 encoding, 

1670 mergetag, 

1671 gpgsig, 

1672 message, 

1673 extra, 

1674 ) 

1675 

1676 

1677class Commit(ShaFile): 

1678 """A git commit object.""" 

1679 

1680 type_name = b"commit" 

1681 type_num = 1 

1682 

1683 __slots__ = ( 

1684 "_author", 

1685 "_author_time", 

1686 "_author_timezone", 

1687 "_author_timezone_neg_utc", 

1688 "_commit_time", 

1689 "_commit_timezone", 

1690 "_commit_timezone_neg_utc", 

1691 "_committer", 

1692 "_encoding", 

1693 "_extra", 

1694 "_gpgsig", 

1695 "_mergetag", 

1696 "_message", 

1697 "_parents", 

1698 "_tree", 

1699 ) 

1700 

1701 def __init__(self) -> None: 

1702 """Initialize an empty Commit.""" 

1703 super().__init__() 

1704 self._parents: list[bytes] = [] 

1705 self._encoding: Optional[bytes] = None 

1706 self._mergetag: list[Tag] = [] 

1707 self._gpgsig: Optional[bytes] = None 

1708 self._extra: list[tuple[bytes, Optional[bytes]]] = [] 

1709 self._author_timezone_neg_utc: Optional[bool] = False 

1710 self._commit_timezone_neg_utc: Optional[bool] = False 

1711 

1712 @classmethod 

1713 def from_path(cls, path: Union[str, bytes]) -> "Commit": 

1714 """Read a commit from a file on disk. 

1715 

1716 Args: 

1717 path: Path to the commit file 

1718 

1719 Returns: 

1720 A Commit object 

1721 

1722 Raises: 

1723 NotCommitError: If the file is not a commit 

1724 """ 

1725 commit = ShaFile.from_path(path) 

1726 if not isinstance(commit, cls): 

1727 raise NotCommitError(_path_to_bytes(path)) 

1728 return commit 

1729 

1730 def _deserialize(self, chunks: list[bytes]) -> None: 

1731 self._parents = [] 

1732 self._extra = [] 

1733 self._tree = None 

1734 author_info: tuple[ 

1735 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1736 ] = (None, None, (None, None)) 

1737 commit_info: tuple[ 

1738 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1739 ] = (None, None, (None, None)) 

1740 self._encoding = None 

1741 self._mergetag = [] 

1742 self._message = None 

1743 self._gpgsig = None 

1744 

1745 for field, value in _parse_message(chunks): 

1746 # TODO(jelmer): Enforce ordering 

1747 if field == _TREE_HEADER: 

1748 self._tree = value 

1749 elif field == _PARENT_HEADER: 

1750 assert value is not None 

1751 self._parents.append(value) 

1752 elif field == _AUTHOR_HEADER: 

1753 if value is None: 

1754 raise ObjectFormatException("missing author value") 

1755 author_info = parse_time_entry(value) 

1756 elif field == _COMMITTER_HEADER: 

1757 if value is None: 

1758 raise ObjectFormatException("missing committer value") 

1759 commit_info = parse_time_entry(value) 

1760 elif field == _ENCODING_HEADER: 

1761 self._encoding = value 

1762 elif field == _MERGETAG_HEADER: 

1763 assert value is not None 

1764 tag = Tag.from_string(value + b"\n") 

1765 assert isinstance(tag, Tag) 

1766 self._mergetag.append(tag) 

1767 elif field == _GPGSIG_HEADER: 

1768 self._gpgsig = value 

1769 elif field is None: 

1770 self._message = value 

1771 else: 

1772 self._extra.append((field, value)) 

1773 

1774 ( 

1775 self._author, 

1776 self._author_time, 

1777 (self._author_timezone, self._author_timezone_neg_utc), 

1778 ) = author_info 

1779 ( 

1780 self._committer, 

1781 self._commit_time, 

1782 (self._commit_timezone, self._commit_timezone_neg_utc), 

1783 ) = commit_info 

1784 

1785 def check(self) -> None: 

1786 """Check this object for internal consistency. 

1787 

1788 Raises: 

1789 ObjectFormatException: if the object is malformed in some way 

1790 """ 

1791 super().check() 

1792 assert self._chunked_text is not None 

1793 self._check_has_member("_tree", "missing tree") 

1794 self._check_has_member("_author", "missing author") 

1795 self._check_has_member("_committer", "missing committer") 

1796 self._check_has_member("_author_time", "missing author time") 

1797 self._check_has_member("_commit_time", "missing commit time") 

1798 

1799 for parent in self._parents: 

1800 check_hexsha(parent, "invalid parent sha") 

1801 assert self._tree is not None # checked by _check_has_member above 

1802 check_hexsha(self._tree, "invalid tree sha") 

1803 

1804 assert self._author is not None # checked by _check_has_member above 

1805 assert self._committer is not None # checked by _check_has_member above 

1806 check_identity(self._author, "invalid author") 

1807 check_identity(self._committer, "invalid committer") 

1808 

1809 assert self._author_time is not None # checked by _check_has_member above 

1810 assert self._commit_time is not None # checked by _check_has_member above 

1811 check_time(self._author_time) 

1812 check_time(self._commit_time) 

1813 

1814 last = None 

1815 for field, _ in _parse_message(self._chunked_text): 

1816 if field == _TREE_HEADER and last is not None: 

1817 raise ObjectFormatException("unexpected tree") 

1818 elif field == _PARENT_HEADER and last not in ( 

1819 _PARENT_HEADER, 

1820 _TREE_HEADER, 

1821 ): 

1822 raise ObjectFormatException("unexpected parent") 

1823 elif field == _AUTHOR_HEADER and last not in ( 

1824 _TREE_HEADER, 

1825 _PARENT_HEADER, 

1826 ): 

1827 raise ObjectFormatException("unexpected author") 

1828 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1829 raise ObjectFormatException("unexpected committer") 

1830 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1831 raise ObjectFormatException("unexpected encoding") 

1832 last = field 

1833 

1834 # TODO: optionally check for duplicate parents 

1835 

1836 def sign(self, keyid: Optional[str] = None) -> None: 

1837 """Sign this commit with a GPG key. 

1838 

1839 Args: 

1840 keyid: Optional GPG key ID to use for signing. If not specified, 

1841 the default GPG key will be used. 

1842 """ 

1843 import gpg 

1844 

1845 with gpg.Context(armor=True) as c: 

1846 if keyid is not None: 

1847 key = c.get_key(keyid) 

1848 with gpg.Context(armor=True, signers=[key]) as ctx: 

1849 self.gpgsig, _unused_result = ctx.sign( 

1850 self.as_raw_string(), 

1851 mode=gpg.constants.sig.mode.DETACH, 

1852 ) 

1853 else: 

1854 self.gpgsig, _unused_result = c.sign( 

1855 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1856 ) 

1857 

1858 def raw_without_sig(self) -> bytes: 

1859 """Return raw string serialization without the GPG/SSH signature. 

1860 

1861 self.gpgsig is a signature for the returned raw byte string serialization. 

1862 """ 

1863 tmp = self.copy() 

1864 assert isinstance(tmp, Commit) 

1865 tmp._gpgsig = None 

1866 tmp.gpgsig = None 

1867 return tmp.as_raw_string() 

1868 

1869 def extract_signature(self) -> tuple[bytes, Optional[bytes], Optional[bytes]]: 

1870 """Extract the payload, signature, and signature type from this commit. 

1871 

1872 Returns: 

1873 Tuple of (``payload``, ``signature``, ``signature_type``) where: 

1874 

1875 - ``payload``: The raw commit data without the signature 

1876 - ``signature``: The signature bytes if present, None otherwise 

1877 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1878 

1879 Raises: 

1880 ObjectFormatException: If signature has unknown format 

1881 """ 

1882 if self._gpgsig is None: 

1883 return self.as_raw_string(), None, None 

1884 

1885 payload = self.raw_without_sig() 

1886 

1887 # Determine signature type 

1888 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE): 

1889 sig_type = SIGNATURE_PGP 

1890 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE): 

1891 sig_type = SIGNATURE_SSH 

1892 else: 

1893 raise ObjectFormatException("Unknown signature format") 

1894 

1895 return payload, self._gpgsig, sig_type 

1896 

1897 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1898 """Verify GPG signature for this commit (if it is signed). 

1899 

1900 Args: 

1901 keyids: Optional iterable of trusted keyids for this commit. 

1902 If this commit is not signed by any key in keyids verification will 

1903 fail. If not specified, this function only verifies that the commit 

1904 has a valid signature. 

1905 

1906 Raises: 

1907 gpg.errors.BadSignatures: if GPG signature verification fails 

1908 gpg.errors.MissingSignatures: if commit was not signed by a key 

1909 specified in keyids 

1910 """ 

1911 if self._gpgsig is None: 

1912 return 

1913 

1914 import gpg 

1915 

1916 with gpg.Context() as ctx: 

1917 data, result = ctx.verify( 

1918 self.raw_without_sig(), 

1919 signature=self._gpgsig, 

1920 ) 

1921 if keyids: 

1922 keys = [ctx.get_key(key) for key in keyids] 

1923 for key in keys: 

1924 for subkey in key.subkeys: 

1925 for sig in result.signatures: 

1926 if subkey.can_sign and subkey.fpr == sig.fpr: 

1927 return 

1928 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1929 

1930 def _serialize(self) -> list[bytes]: 

1931 headers = [] 

1932 assert self._tree is not None 

1933 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1934 headers.append((_TREE_HEADER, tree_bytes)) 

1935 for p in self._parents: 

1936 headers.append((_PARENT_HEADER, p)) 

1937 assert self._author is not None 

1938 assert self._author_time is not None 

1939 assert self._author_timezone is not None 

1940 assert self._author_timezone_neg_utc is not None 

1941 headers.append( 

1942 ( 

1943 _AUTHOR_HEADER, 

1944 format_time_entry( 

1945 self._author, 

1946 self._author_time, 

1947 (self._author_timezone, self._author_timezone_neg_utc), 

1948 ), 

1949 ) 

1950 ) 

1951 assert self._committer is not None 

1952 assert self._commit_time is not None 

1953 assert self._commit_timezone is not None 

1954 assert self._commit_timezone_neg_utc is not None 

1955 headers.append( 

1956 ( 

1957 _COMMITTER_HEADER, 

1958 format_time_entry( 

1959 self._committer, 

1960 self._commit_time, 

1961 (self._commit_timezone, self._commit_timezone_neg_utc), 

1962 ), 

1963 ) 

1964 ) 

1965 if self.encoding: 

1966 headers.append((_ENCODING_HEADER, self.encoding)) 

1967 for mergetag in self.mergetag: 

1968 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

1969 headers.extend( 

1970 (field, value) for field, value in self._extra if value is not None 

1971 ) 

1972 if self.gpgsig: 

1973 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

1974 return list(_format_message(headers, self._message)) 

1975 

1976 tree = serializable_property("tree", "Tree that is the state of this commit") 

1977 

1978 def _get_parents(self) -> list[bytes]: 

1979 """Return a list of parents of this commit.""" 

1980 return self._parents 

1981 

1982 def _set_parents(self, value: list[bytes]) -> None: 

1983 """Set a list of parents of this commit.""" 

1984 self._needs_serialization = True 

1985 self._parents = value 

1986 

1987 parents = property( 

1988 _get_parents, 

1989 _set_parents, 

1990 doc="Parents of this commit, by their SHA1.", 

1991 ) 

1992 

1993 @replace_me(since="0.21.0", remove_in="0.24.0") 

1994 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]: 

1995 """Return extra settings of this commit.""" 

1996 return self._extra 

1997 

1998 extra = property( 

1999 _get_extra, 

2000 doc="Extra header fields not understood (presumably added in a " 

2001 "newer version of git). Kept verbatim so the object can " 

2002 "be correctly reserialized. For private commit metadata, use " 

2003 "pseudo-headers in Commit.message, rather than this field.", 

2004 ) 

2005 

2006 author = serializable_property("author", "The name of the author of the commit") 

2007 

2008 committer = serializable_property( 

2009 "committer", "The name of the committer of the commit" 

2010 ) 

2011 

2012 message = serializable_property("message", "The commit message") 

2013 

2014 commit_time = serializable_property( 

2015 "commit_time", 

2016 "The timestamp of the commit. As the number of seconds since the epoch.", 

2017 ) 

2018 

2019 commit_timezone = serializable_property( 

2020 "commit_timezone", "The zone the commit time is in" 

2021 ) 

2022 

2023 author_time = serializable_property( 

2024 "author_time", 

2025 "The timestamp the commit was written. As the number of " 

2026 "seconds since the epoch.", 

2027 ) 

2028 

2029 author_timezone = serializable_property( 

2030 "author_timezone", "Returns the zone the author time is in." 

2031 ) 

2032 

2033 encoding = serializable_property("encoding", "Encoding of the commit message.") 

2034 

2035 mergetag = serializable_property("mergetag", "Associated signed tag.") 

2036 

2037 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

2038 

2039 

2040OBJECT_CLASSES = ( 

2041 Commit, 

2042 Tree, 

2043 Blob, 

2044 Tag, 

2045) 

2046 

2047_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {} 

2048 

2049for cls in OBJECT_CLASSES: 

2050 _TYPE_MAP[cls.type_name] = cls 

2051 _TYPE_MAP[cls.type_num] = cls 

2052 

2053 

2054# Hold on to the pure-python implementations for testing 

2055_parse_tree_py = parse_tree 

2056_sorted_tree_items_py = sorted_tree_items 

2057try: 

2058 # Try to import Rust versions 

2059 from dulwich._objects import ( 

2060 parse_tree as _parse_tree_rs, 

2061 ) 

2062 from dulwich._objects import ( 

2063 sorted_tree_items as _sorted_tree_items_rs, 

2064 ) 

2065except ImportError: 

2066 pass 

2067else: 

2068 parse_tree = _parse_tree_rs 

2069 sorted_tree_items = _sorted_tree_items_rs