Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1018 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25import binascii 

26import os 

27import posixpath 

28import stat 

29import sys 

30import zlib 

31from collections.abc import Callable, Iterable, Iterator, Sequence 

32from hashlib import sha1 

33from io import BufferedIOBase, BytesIO 

34from typing import ( 

35 IO, 

36 TYPE_CHECKING, 

37 NamedTuple, 

38 TypeVar, 

39 Union, 

40) 

41 

42if sys.version_info >= (3, 11): 

43 from typing import Self 

44else: 

45 from typing_extensions import Self 

46 

47from typing import TypeGuard 

48 

49from . import replace_me 

50from .errors import ( 

51 ChecksumMismatch, 

52 FileFormatException, 

53 NotBlobError, 

54 NotCommitError, 

55 NotTagError, 

56 NotTreeError, 

57 ObjectFormatException, 

58) 

59from .file import GitFile 

60 

61if TYPE_CHECKING: 

62 from _hashlib import HASH 

63 

64 from .file import _GitFile 

65 

66ZERO_SHA = b"0" * 40 

67 

68# Header fields for commits 

69_TREE_HEADER = b"tree" 

70_PARENT_HEADER = b"parent" 

71_AUTHOR_HEADER = b"author" 

72_COMMITTER_HEADER = b"committer" 

73_ENCODING_HEADER = b"encoding" 

74_MERGETAG_HEADER = b"mergetag" 

75_GPGSIG_HEADER = b"gpgsig" 

76 

77# Header fields for objects 

78_OBJECT_HEADER = b"object" 

79_TYPE_HEADER = b"type" 

80_TAG_HEADER = b"tag" 

81_TAGGER_HEADER = b"tagger" 

82 

83 

84S_IFGITLINK = 0o160000 

85 

86 

87MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

88 

89BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

90BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----" 

91 

92# Signature type constants 

93SIGNATURE_PGP = b"pgp" 

94SIGNATURE_SSH = b"ssh" 

95 

96 

97ObjectID = bytes 

98 

99 

100class EmptyFileException(FileFormatException): 

101 """An unexpectedly empty file was encountered.""" 

102 

103 

104def S_ISGITLINK(m: int) -> bool: 

105 """Check if a mode indicates a submodule. 

106 

107 Args: 

108 m: Mode to check 

109 Returns: a ``boolean`` 

110 """ 

111 return stat.S_IFMT(m) == S_IFGITLINK 

112 

113 

114def _decompress(string: bytes) -> bytes: 

115 dcomp = zlib.decompressobj() 

116 dcomped = dcomp.decompress(string) 

117 dcomped += dcomp.flush() 

118 return dcomped 

119 

120 

121def sha_to_hex(sha: ObjectID) -> bytes: 

122 """Takes a string and returns the hex of the sha within.""" 

123 hexsha = binascii.hexlify(sha) 

124 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

125 return hexsha 

126 

127 

128def hex_to_sha(hex: bytes | str) -> bytes: 

129 """Takes a hex sha and returns a binary sha.""" 

130 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}" 

131 try: 

132 return binascii.unhexlify(hex) 

133 except TypeError as exc: 

134 if not isinstance(hex, bytes): 

135 raise 

136 raise ValueError(exc.args[0]) from exc 

137 

138 

139def valid_hexsha(hex: bytes | str) -> bool: 

140 """Check if a string is a valid hex SHA. 

141 

142 Args: 

143 hex: Hex string to check 

144 

145 Returns: 

146 True if valid hex SHA, False otherwise 

147 """ 

148 if len(hex) != 40: 

149 return False 

150 try: 

151 binascii.unhexlify(hex) 

152 except (TypeError, binascii.Error): 

153 return False 

154 else: 

155 return True 

156 

157 

158PathT = TypeVar("PathT", str, bytes) 

159 

160 

161def hex_to_filename(path: PathT, hex: str | bytes) -> PathT: 

162 """Takes a hex sha and returns its filename relative to the given path.""" 

163 # os.path.join accepts bytes or unicode, but all args must be of the same 

164 # type. Make sure that hex which is expected to be bytes, is the same type 

165 # as path. 

166 if isinstance(path, str): 

167 if isinstance(hex, bytes): 

168 hex_str = hex.decode("ascii") 

169 else: 

170 hex_str = hex 

171 dir_name = hex_str[:2] 

172 file_name = hex_str[2:] 

173 result = os.path.join(path, dir_name, file_name) 

174 assert isinstance(result, str) 

175 return result 

176 else: 

177 # path is bytes 

178 if isinstance(hex, str): 

179 hex_bytes = hex.encode("ascii") 

180 else: 

181 hex_bytes = hex 

182 dir_name_b = hex_bytes[:2] 

183 file_name_b = hex_bytes[2:] 

184 result_b = os.path.join(path, dir_name_b, file_name_b) 

185 assert isinstance(result_b, bytes) 

186 return result_b 

187 

188 

189def filename_to_hex(filename: str | bytes) -> str: 

190 """Takes an object filename and returns its corresponding hex sha.""" 

191 # grab the last (up to) two path components 

192 errmsg = f"Invalid object filename: {filename!r}" 

193 if isinstance(filename, str): 

194 names = filename.rsplit(os.path.sep, 2)[-2:] 

195 assert len(names) == 2, errmsg 

196 base, rest = names 

197 assert len(base) == 2 and len(rest) == 38, errmsg 

198 hex_str = base + rest 

199 hex_bytes = hex_str.encode("ascii") 

200 else: 

201 # filename is bytes 

202 sep = ( 

203 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep 

204 ) 

205 names_b = filename.rsplit(sep, 2)[-2:] 

206 assert len(names_b) == 2, errmsg 

207 base_b, rest_b = names_b 

208 assert len(base_b) == 2 and len(rest_b) == 38, errmsg 

209 hex_bytes = base_b + rest_b 

210 hex_to_sha(hex_bytes) 

211 return hex_bytes.decode("ascii") 

212 

213 

214def object_header(num_type: int, length: int) -> bytes: 

215 """Return an object header for the given numeric type and text length.""" 

216 cls = object_class(num_type) 

217 if cls is None: 

218 raise AssertionError(f"unsupported class type num: {num_type}") 

219 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

220 

221 

222def serializable_property(name: str, docstring: str | None = None) -> property: 

223 """A property that helps tracking whether serialization is necessary.""" 

224 

225 def set(obj: "ShaFile", value: object) -> None: 

226 """Set the property value and mark the object as needing serialization. 

227 

228 Args: 

229 obj: The ShaFile object 

230 value: The value to set 

231 """ 

232 setattr(obj, "_" + name, value) 

233 obj._needs_serialization = True 

234 

235 def get(obj: "ShaFile") -> object: 

236 """Get the property value. 

237 

238 Args: 

239 obj: The ShaFile object 

240 

241 Returns: 

242 The property value 

243 """ 

244 return getattr(obj, "_" + name) 

245 

246 return property(get, set, doc=docstring) 

247 

248 

249def object_class(type: bytes | int) -> type["ShaFile"] | None: 

250 """Get the object class corresponding to the given type. 

251 

252 Args: 

253 type: Either a type name string or a numeric type. 

254 Returns: The ShaFile subclass corresponding to the given type, or None if 

255 type is not a valid type name/number. 

256 """ 

257 return _TYPE_MAP.get(type, None) 

258 

259 

260def check_hexsha(hex: str | bytes, error_msg: str) -> None: 

261 """Check if a string is a valid hex sha string. 

262 

263 Args: 

264 hex: Hex string to check 

265 error_msg: Error message to use in exception 

266 Raises: 

267 ObjectFormatException: Raised when the string is not valid 

268 """ 

269 if not valid_hexsha(hex): 

270 raise ObjectFormatException(f"{error_msg} {hex!r}") 

271 

272 

273def check_identity(identity: bytes | None, error_msg: str) -> None: 

274 """Check if the specified identity is valid. 

275 

276 This will raise an exception if the identity is not valid. 

277 

278 Args: 

279 identity: Identity string 

280 error_msg: Error message to use in exception 

281 """ 

282 if identity is None: 

283 raise ObjectFormatException(error_msg) 

284 email_start = identity.find(b"<") 

285 email_end = identity.find(b">") 

286 if not all( 

287 [ 

288 email_start >= 1, 

289 identity[email_start - 1] == b" "[0], 

290 identity.find(b"<", email_start + 1) == -1, 

291 email_end == len(identity) - 1, 

292 b"\0" not in identity, 

293 b"\n" not in identity, 

294 ] 

295 ): 

296 raise ObjectFormatException(error_msg) 

297 

298 

299def _path_to_bytes(path: str | bytes) -> bytes: 

300 """Convert a path to bytes for use in error messages.""" 

301 if isinstance(path, str): 

302 return path.encode("utf-8", "surrogateescape") 

303 return path 

304 

305 

306def check_time(time_seconds: int) -> None: 

307 """Check if the specified time is not prone to overflow error. 

308 

309 This will raise an exception if the time is not valid. 

310 

311 Args: 

312 time_seconds: time in seconds 

313 

314 """ 

315 # Prevent overflow error 

316 if time_seconds > MAX_TIME: 

317 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

318 

319 

320def git_line(*items: bytes) -> bytes: 

321 """Formats items into a space separated line.""" 

322 return b" ".join(items) + b"\n" 

323 

324 

325class FixedSha: 

326 """SHA object that behaves like hashlib's but is given a fixed value.""" 

327 

328 __slots__ = ("_hexsha", "_sha") 

329 

330 def __init__(self, hexsha: str | bytes) -> None: 

331 """Initialize FixedSha with a fixed SHA value. 

332 

333 Args: 

334 hexsha: Hex SHA value as string or bytes 

335 """ 

336 if isinstance(hexsha, str): 

337 hexsha = hexsha.encode("ascii") 

338 if not isinstance(hexsha, bytes): 

339 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

340 self._hexsha = hexsha 

341 self._sha = hex_to_sha(hexsha) 

342 

343 def digest(self) -> bytes: 

344 """Return the raw SHA digest.""" 

345 return self._sha 

346 

347 def hexdigest(self) -> str: 

348 """Return the hex SHA digest.""" 

349 return self._hexsha.decode("ascii") 

350 

351 

352# Type guard functions for runtime type narrowing 

353if TYPE_CHECKING: 

354 

355 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

356 """Check if a ShaFile is a Commit.""" 

357 return obj.type_name == b"commit" 

358 

359 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

360 """Check if a ShaFile is a Tree.""" 

361 return obj.type_name == b"tree" 

362 

363 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

364 """Check if a ShaFile is a Blob.""" 

365 return obj.type_name == b"blob" 

366 

367 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

368 """Check if a ShaFile is a Tag.""" 

369 return obj.type_name == b"tag" 

370else: 

371 # Runtime versions without type narrowing 

372 def is_commit(obj: "ShaFile") -> bool: 

373 """Check if a ShaFile is a Commit.""" 

374 return obj.type_name == b"commit" 

375 

376 def is_tree(obj: "ShaFile") -> bool: 

377 """Check if a ShaFile is a Tree.""" 

378 return obj.type_name == b"tree" 

379 

380 def is_blob(obj: "ShaFile") -> bool: 

381 """Check if a ShaFile is a Blob.""" 

382 return obj.type_name == b"blob" 

383 

384 def is_tag(obj: "ShaFile") -> bool: 

385 """Check if a ShaFile is a Tag.""" 

386 return obj.type_name == b"tag" 

387 

388 

389class ShaFile: 

390 """A git SHA file.""" 

391 

392 __slots__ = ("_chunked_text", "_needs_serialization", "_sha") 

393 

394 _needs_serialization: bool 

395 type_name: bytes 

396 type_num: int 

397 _chunked_text: list[bytes] | None 

398 _sha: Union[FixedSha, None, "HASH"] 

399 

400 @staticmethod 

401 def _parse_legacy_object_header( 

402 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

403 ) -> "ShaFile": 

404 """Parse a legacy object, creating it but not reading the file.""" 

405 bufsize = 1024 

406 decomp = zlib.decompressobj() 

407 header = decomp.decompress(magic) 

408 start = 0 

409 end = -1 

410 while end < 0: 

411 extra = f.read(bufsize) 

412 header += decomp.decompress(extra) 

413 magic += extra 

414 end = header.find(b"\0", start) 

415 start = len(header) 

416 header = header[:end] 

417 type_name, size = header.split(b" ", 1) 

418 try: 

419 int(size) # sanity check 

420 except ValueError as exc: 

421 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

422 obj_class = object_class(type_name) 

423 if not obj_class: 

424 raise ObjectFormatException( 

425 "Not a known type: {}".format(type_name.decode("ascii")) 

426 ) 

427 return obj_class() 

428 

429 def _parse_legacy_object(self, map: bytes) -> None: 

430 """Parse a legacy object, setting the raw string.""" 

431 text = _decompress(map) 

432 header_end = text.find(b"\0") 

433 if header_end < 0: 

434 raise ObjectFormatException("Invalid object header, no \\0") 

435 self.set_raw_string(text[header_end + 1 :]) 

436 

437 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

438 """Return chunks representing the object in the experimental format. 

439 

440 Returns: List of strings 

441 """ 

442 compobj = zlib.compressobj(compression_level) 

443 yield compobj.compress(self._header()) 

444 for chunk in self.as_raw_chunks(): 

445 yield compobj.compress(chunk) 

446 yield compobj.flush() 

447 

448 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

449 """Return string representing the object in the experimental format.""" 

450 return b"".join( 

451 self.as_legacy_object_chunks(compression_level=compression_level) 

452 ) 

453 

454 def as_raw_chunks(self) -> list[bytes]: 

455 """Return chunks with serialization of the object. 

456 

457 Returns: List of strings, not necessarily one per line 

458 """ 

459 if self._needs_serialization: 

460 self._sha = None 

461 self._chunked_text = self._serialize() 

462 self._needs_serialization = False 

463 assert self._chunked_text is not None 

464 return self._chunked_text 

465 

466 def as_raw_string(self) -> bytes: 

467 """Return raw string with serialization of the object. 

468 

469 Returns: String object 

470 """ 

471 return b"".join(self.as_raw_chunks()) 

472 

473 def __bytes__(self) -> bytes: 

474 """Return raw string serialization of this object.""" 

475 return self.as_raw_string() 

476 

477 def __hash__(self) -> int: 

478 """Return unique hash for this object.""" 

479 return hash(self.id) 

480 

481 def as_pretty_string(self) -> str: 

482 """Return a string representing this object, fit for display.""" 

483 return self.as_raw_string().decode("utf-8", "replace") 

484 

485 def set_raw_string(self, text: bytes, sha: ObjectID | None = None) -> None: 

486 """Set the contents of this object from a serialized string.""" 

487 if not isinstance(text, bytes): 

488 raise TypeError(f"Expected bytes for text, got {text!r}") 

489 self.set_raw_chunks([text], sha) 

490 

491 def set_raw_chunks(self, chunks: list[bytes], sha: ObjectID | None = None) -> None: 

492 """Set the contents of this object from a list of chunks.""" 

493 self._chunked_text = chunks 

494 self._deserialize(chunks) 

495 if sha is None: 

496 self._sha = None 

497 else: 

498 self._sha = FixedSha(sha) 

499 self._needs_serialization = False 

500 

501 @staticmethod 

502 def _parse_object_header( 

503 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

504 ) -> "ShaFile": 

505 """Parse a new style object, creating it but not reading the file.""" 

506 num_type = (ord(magic[0:1]) >> 4) & 7 

507 obj_class = object_class(num_type) 

508 if not obj_class: 

509 raise ObjectFormatException(f"Not a known type {num_type}") 

510 return obj_class() 

511 

512 def _parse_object(self, map: bytes) -> None: 

513 """Parse a new style object, setting self._text.""" 

514 # skip type and size; type must have already been determined, and 

515 # we trust zlib to fail if it's otherwise corrupted 

516 byte = ord(map[0:1]) 

517 used = 1 

518 while (byte & 0x80) != 0: 

519 byte = ord(map[used : used + 1]) 

520 used += 1 

521 raw = map[used:] 

522 self.set_raw_string(_decompress(raw)) 

523 

524 @classmethod 

525 def _is_legacy_object(cls, magic: bytes) -> bool: 

526 b0 = ord(magic[0:1]) 

527 b1 = ord(magic[1:2]) 

528 word = (b0 << 8) + b1 

529 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

530 

531 @classmethod 

532 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

533 map = f.read() 

534 if not map: 

535 raise EmptyFileException("Corrupted empty file detected") 

536 

537 if cls._is_legacy_object(map): 

538 obj = cls._parse_legacy_object_header(map, f) 

539 obj._parse_legacy_object(map) 

540 else: 

541 obj = cls._parse_object_header(map, f) 

542 obj._parse_object(map) 

543 return obj 

544 

545 def __init__(self) -> None: 

546 """Don't call this directly.""" 

547 self._sha = None 

548 self._chunked_text = [] 

549 self._needs_serialization = True 

550 

551 def _deserialize(self, chunks: list[bytes]) -> None: 

552 raise NotImplementedError(self._deserialize) 

553 

554 def _serialize(self) -> list[bytes]: 

555 raise NotImplementedError(self._serialize) 

556 

557 @classmethod 

558 def from_path(cls, path: str | bytes) -> "ShaFile": 

559 """Open a SHA file from disk.""" 

560 with GitFile(path, "rb") as f: 

561 return cls.from_file(f) 

562 

563 @classmethod 

564 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

565 """Get the contents of a SHA file on disk.""" 

566 try: 

567 obj = cls._parse_file(f) 

568 obj._sha = None 

569 return obj 

570 except (IndexError, ValueError) as exc: 

571 raise ObjectFormatException("invalid object header") from exc 

572 

573 @staticmethod 

574 def from_raw_string( 

575 type_num: int, string: bytes, sha: ObjectID | None = None 

576 ) -> "ShaFile": 

577 """Creates an object of the indicated type from the raw string given. 

578 

579 Args: 

580 type_num: The numeric type of the object. 

581 string: The raw uncompressed contents. 

582 sha: Optional known sha for the object 

583 """ 

584 cls = object_class(type_num) 

585 if cls is None: 

586 raise AssertionError(f"unsupported class type num: {type_num}") 

587 obj = cls() 

588 obj.set_raw_string(string, sha) 

589 return obj 

590 

591 @staticmethod 

592 def from_raw_chunks( 

593 type_num: int, chunks: list[bytes], sha: ObjectID | None = None 

594 ) -> "ShaFile": 

595 """Creates an object of the indicated type from the raw chunks given. 

596 

597 Args: 

598 type_num: The numeric type of the object. 

599 chunks: An iterable of the raw uncompressed contents. 

600 sha: Optional known sha for the object 

601 """ 

602 cls = object_class(type_num) 

603 if cls is None: 

604 raise AssertionError(f"unsupported class type num: {type_num}") 

605 obj = cls() 

606 obj.set_raw_chunks(chunks, sha) 

607 return obj 

608 

609 @classmethod 

610 def from_string(cls, string: bytes) -> Self: 

611 """Create a ShaFile from a string.""" 

612 obj = cls() 

613 obj.set_raw_string(string) 

614 return obj 

615 

616 def _check_has_member(self, member: str, error_msg: str) -> None: 

617 """Check that the object has a given member variable. 

618 

619 Args: 

620 member: the member variable to check for 

621 error_msg: the message for an error if the member is missing 

622 Raises: 

623 ObjectFormatException: with the given error_msg if member is 

624 missing or is None 

625 """ 

626 if getattr(self, member, None) is None: 

627 raise ObjectFormatException(error_msg) 

628 

629 def check(self) -> None: 

630 """Check this object for internal consistency. 

631 

632 Raises: 

633 ObjectFormatException: if the object is malformed in some way 

634 ChecksumMismatch: if the object was created with a SHA that does 

635 not match its contents 

636 """ 

637 # TODO: if we find that error-checking during object parsing is a 

638 # performance bottleneck, those checks should be moved to the class's 

639 # check() method during optimization so we can still check the object 

640 # when necessary. 

641 old_sha = self.id 

642 try: 

643 self._deserialize(self.as_raw_chunks()) 

644 self._sha = None 

645 new_sha = self.id 

646 except Exception as exc: 

647 raise ObjectFormatException(exc) from exc 

648 if old_sha != new_sha: 

649 raise ChecksumMismatch(new_sha, old_sha) 

650 

651 def _header(self) -> bytes: 

652 return object_header(self.type_num, self.raw_length()) 

653 

654 def raw_length(self) -> int: 

655 """Returns the length of the raw string of this object.""" 

656 return sum(map(len, self.as_raw_chunks())) 

657 

658 def sha(self) -> Union[FixedSha, "HASH"]: 

659 """The SHA1 object that is the name of this object.""" 

660 if self._sha is None or self._needs_serialization: 

661 # this is a local because as_raw_chunks() overwrites self._sha 

662 new_sha = sha1() 

663 new_sha.update(self._header()) 

664 for chunk in self.as_raw_chunks(): 

665 new_sha.update(chunk) 

666 self._sha = new_sha 

667 return self._sha 

668 

669 def copy(self) -> "ShaFile": 

670 """Create a new copy of this SHA1 object from its raw string.""" 

671 obj_class = object_class(self.type_num) 

672 if obj_class is None: 

673 raise AssertionError(f"invalid type num {self.type_num}") 

674 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

675 

676 @property 

677 def id(self) -> bytes: 

678 """The hex SHA of this object.""" 

679 return self.sha().hexdigest().encode("ascii") 

680 

681 def __repr__(self) -> str: 

682 """Return string representation of this object.""" 

683 return f"<{self.__class__.__name__} {self.id!r}>" 

684 

685 def __ne__(self, other: object) -> bool: 

686 """Check whether this object does not match the other.""" 

687 return not isinstance(other, ShaFile) or self.id != other.id 

688 

689 def __eq__(self, other: object) -> bool: 

690 """Return True if the SHAs of the two objects match.""" 

691 return isinstance(other, ShaFile) and self.id == other.id 

692 

693 def __lt__(self, other: object) -> bool: 

694 """Return whether SHA of this object is less than the other.""" 

695 if not isinstance(other, ShaFile): 

696 raise TypeError 

697 return self.id < other.id 

698 

699 def __le__(self, other: object) -> bool: 

700 """Check whether SHA of this object is less than or equal to the other.""" 

701 if not isinstance(other, ShaFile): 

702 raise TypeError 

703 return self.id <= other.id 

704 

705 

706class Blob(ShaFile): 

707 """A Git Blob object.""" 

708 

709 __slots__ = () 

710 

711 type_name = b"blob" 

712 type_num = 3 

713 

714 _chunked_text: list[bytes] 

715 

716 def __init__(self) -> None: 

717 """Initialize a new Blob object.""" 

718 super().__init__() 

719 self._chunked_text = [] 

720 self._needs_serialization = False 

721 

722 def _get_data(self) -> bytes: 

723 return self.as_raw_string() 

724 

725 def _set_data(self, data: bytes) -> None: 

726 self.set_raw_string(data) 

727 

728 data = property( 

729 _get_data, _set_data, doc="The text contained within the blob object." 

730 ) 

731 

732 def _get_chunked(self) -> list[bytes]: 

733 return self._chunked_text 

734 

735 def _set_chunked(self, chunks: list[bytes]) -> None: 

736 self._chunked_text = chunks 

737 

738 def _serialize(self) -> list[bytes]: 

739 return self._chunked_text 

740 

741 def _deserialize(self, chunks: list[bytes]) -> None: 

742 self._chunked_text = chunks 

743 

744 chunked = property( 

745 _get_chunked, 

746 _set_chunked, 

747 doc="The text in the blob object, as chunks (not necessarily lines)", 

748 ) 

749 

750 @classmethod 

751 def from_path(cls, path: str | bytes) -> "Blob": 

752 """Read a blob from a file on disk. 

753 

754 Args: 

755 path: Path to the blob file 

756 

757 Returns: 

758 A Blob object 

759 

760 Raises: 

761 NotBlobError: If the file is not a blob 

762 """ 

763 blob = ShaFile.from_path(path) 

764 if not isinstance(blob, cls): 

765 raise NotBlobError(_path_to_bytes(path)) 

766 return blob 

767 

768 def check(self) -> None: 

769 """Check this object for internal consistency. 

770 

771 Raises: 

772 ObjectFormatException: if the object is malformed in some way 

773 """ 

774 super().check() 

775 

776 def splitlines(self) -> list[bytes]: 

777 """Return list of lines in this blob. 

778 

779 This preserves the original line endings. 

780 """ 

781 chunks = self.chunked 

782 if not chunks: 

783 return [] 

784 if len(chunks) == 1: 

785 result: list[bytes] = chunks[0].splitlines(True) 

786 return result 

787 remaining = None 

788 ret = [] 

789 for chunk in chunks: 

790 lines = chunk.splitlines(True) 

791 if len(lines) > 1: 

792 ret.append((remaining or b"") + lines[0]) 

793 ret.extend(lines[1:-1]) 

794 remaining = lines[-1] 

795 elif len(lines) == 1: 

796 if remaining is None: 

797 remaining = lines.pop() 

798 else: 

799 remaining += lines.pop() 

800 if remaining is not None: 

801 ret.append(remaining) 

802 return ret 

803 

804 

805def _parse_message( 

806 chunks: Iterable[bytes], 

807) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]: 

808 """Parse a message with a list of fields and a body. 

809 

810 Args: 

811 chunks: the raw chunks of the tag or commit object. 

812 Returns: iterator of tuples of (field, value), one per header line, in the 

813 order read from the text, possibly including duplicates. Includes a 

814 field named None for the freeform tag/commit text. 

815 """ 

816 f = BytesIO(b"".join(chunks)) 

817 k = None 

818 v = b"" 

819 eof = False 

820 

821 def _strip_last_newline(value: bytes) -> bytes: 

822 """Strip the last newline from value.""" 

823 if value and value.endswith(b"\n"): 

824 return value[:-1] 

825 return value 

826 

827 # Parse the headers 

828 # 

829 # Headers can contain newlines. The next line is indented with a space. 

830 # We store the latest key as 'k', and the accumulated value as 'v'. 

831 for line in f: 

832 if line.startswith(b" "): 

833 # Indented continuation of the previous line 

834 v += line[1:] 

835 else: 

836 if k is not None: 

837 # We parsed a new header, return its value 

838 yield (k, _strip_last_newline(v)) 

839 if line == b"\n": 

840 # Empty line indicates end of headers 

841 break 

842 (k, v) = line.split(b" ", 1) 

843 

844 else: 

845 # We reached end of file before the headers ended. We still need to 

846 # return the previous header, then we need to return a None field for 

847 # the text. 

848 eof = True 

849 if k is not None: 

850 yield (k, _strip_last_newline(v)) 

851 yield (None, None) 

852 

853 if not eof: 

854 # We didn't reach the end of file while parsing headers. We can return 

855 # the rest of the file as a message. 

856 yield (None, f.read()) 

857 

858 f.close() 

859 

860 

861def _format_message( 

862 headers: Sequence[tuple[bytes, bytes]], body: bytes | None 

863) -> Iterator[bytes]: 

864 for field, value in headers: 

865 lines = value.split(b"\n") 

866 yield git_line(field, lines[0]) 

867 for line in lines[1:]: 

868 yield b" " + line + b"\n" 

869 yield b"\n" # There must be a new line after the headers 

870 if body: 

871 yield body 

872 

873 

874class Tag(ShaFile): 

875 """A Git Tag object.""" 

876 

877 type_name = b"tag" 

878 type_num = 4 

879 

880 __slots__ = ( 

881 "_message", 

882 "_name", 

883 "_object_class", 

884 "_object_sha", 

885 "_signature", 

886 "_tag_time", 

887 "_tag_timezone", 

888 "_tag_timezone_neg_utc", 

889 "_tagger", 

890 ) 

891 

892 _message: bytes | None 

893 _name: bytes | None 

894 _object_class: type["ShaFile"] | None 

895 _object_sha: bytes | None 

896 _signature: bytes | None 

897 _tag_time: int | None 

898 _tag_timezone: int | None 

899 _tag_timezone_neg_utc: bool | None 

900 _tagger: bytes | None 

901 

902 def __init__(self) -> None: 

903 """Initialize a new Tag object.""" 

904 super().__init__() 

905 self._tagger = None 

906 self._tag_time = None 

907 self._tag_timezone = None 

908 self._tag_timezone_neg_utc = False 

909 self._signature: bytes | None = None 

910 

911 @classmethod 

912 def from_path(cls, filename: str | bytes) -> "Tag": 

913 """Read a tag from a file on disk. 

914 

915 Args: 

916 filename: Path to the tag file 

917 

918 Returns: 

919 A Tag object 

920 

921 Raises: 

922 NotTagError: If the file is not a tag 

923 """ 

924 tag = ShaFile.from_path(filename) 

925 if not isinstance(tag, cls): 

926 raise NotTagError(_path_to_bytes(filename)) 

927 return tag 

928 

929 def check(self) -> None: 

930 """Check this object for internal consistency. 

931 

932 Raises: 

933 ObjectFormatException: if the object is malformed in some way 

934 """ 

935 super().check() 

936 assert self._chunked_text is not None 

937 self._check_has_member("_object_sha", "missing object sha") 

938 self._check_has_member("_object_class", "missing object type") 

939 self._check_has_member("_name", "missing tag name") 

940 

941 if not self._name: 

942 raise ObjectFormatException("empty tag name") 

943 

944 if self._object_sha is None: 

945 raise ObjectFormatException("missing object sha") 

946 check_hexsha(self._object_sha, "invalid object sha") 

947 

948 if self._tagger is not None: 

949 check_identity(self._tagger, "invalid tagger") 

950 

951 self._check_has_member("_tag_time", "missing tag time") 

952 if self._tag_time is None: 

953 raise ObjectFormatException("missing tag time") 

954 check_time(self._tag_time) 

955 

956 last = None 

957 for field, _ in _parse_message(self._chunked_text): 

958 if field == _OBJECT_HEADER and last is not None: 

959 raise ObjectFormatException("unexpected object") 

960 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

961 raise ObjectFormatException("unexpected type") 

962 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

963 raise ObjectFormatException("unexpected tag name") 

964 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

965 raise ObjectFormatException("unexpected tagger") 

966 last = field 

967 

968 def _serialize(self) -> list[bytes]: 

969 headers = [] 

970 if self._object_sha is None: 

971 raise ObjectFormatException("missing object sha") 

972 headers.append((_OBJECT_HEADER, self._object_sha)) 

973 if self._object_class is None: 

974 raise ObjectFormatException("missing object class") 

975 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

976 if self._name is None: 

977 raise ObjectFormatException("missing tag name") 

978 headers.append((_TAG_HEADER, self._name)) 

979 if self._tagger: 

980 if self._tag_time is None: 

981 headers.append((_TAGGER_HEADER, self._tagger)) 

982 else: 

983 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

984 raise ObjectFormatException("missing timezone info") 

985 headers.append( 

986 ( 

987 _TAGGER_HEADER, 

988 format_time_entry( 

989 self._tagger, 

990 self._tag_time, 

991 (self._tag_timezone, self._tag_timezone_neg_utc), 

992 ), 

993 ) 

994 ) 

995 

996 if self.message is None and self._signature is None: 

997 body = None 

998 else: 

999 body = (self.message or b"") + (self._signature or b"") 

1000 return list(_format_message(headers, body)) 

1001 

1002 def _deserialize(self, chunks: list[bytes]) -> None: 

1003 """Grab the metadata attached to the tag.""" 

1004 self._tagger = None 

1005 self._tag_time = None 

1006 self._tag_timezone = None 

1007 self._tag_timezone_neg_utc = False 

1008 for field, value in _parse_message(chunks): 

1009 if field == _OBJECT_HEADER: 

1010 self._object_sha = value 

1011 elif field == _TYPE_HEADER: 

1012 assert isinstance(value, bytes) 

1013 obj_class = object_class(value) 

1014 if not obj_class: 

1015 raise ObjectFormatException(f"Not a known type: {value!r}") 

1016 self._object_class = obj_class 

1017 elif field == _TAG_HEADER: 

1018 self._name = value 

1019 elif field == _TAGGER_HEADER: 

1020 if value is None: 

1021 raise ObjectFormatException("missing tagger value") 

1022 ( 

1023 self._tagger, 

1024 self._tag_time, 

1025 (self._tag_timezone, self._tag_timezone_neg_utc), 

1026 ) = parse_time_entry(value) 

1027 elif field is None: 

1028 if value is None: 

1029 self._message = None 

1030 self._signature = None 

1031 else: 

1032 # Try to find either PGP or SSH signature 

1033 sig_idx = None 

1034 try: 

1035 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

1036 except ValueError: 

1037 try: 

1038 sig_idx = value.index(BEGIN_SSH_SIGNATURE) 

1039 except ValueError: 

1040 pass 

1041 

1042 if sig_idx is not None: 

1043 self._message = value[:sig_idx] 

1044 self._signature = value[sig_idx:] 

1045 else: 

1046 self._message = value 

1047 self._signature = None 

1048 else: 

1049 raise ObjectFormatException( 

1050 f"Unknown field {field.decode('ascii', 'replace')}" 

1051 ) 

1052 

1053 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

1054 """Get the object pointed to by this tag. 

1055 

1056 Returns: tuple of (object class, sha). 

1057 """ 

1058 if self._object_class is None or self._object_sha is None: 

1059 raise ValueError("Tag object is not properly initialized") 

1060 return (self._object_class, self._object_sha) 

1061 

1062 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

1063 (self._object_class, self._object_sha) = value 

1064 self._needs_serialization = True 

1065 

1066 object = property(_get_object, _set_object) 

1067 

1068 name = serializable_property("name", "The name of this tag") 

1069 tagger = serializable_property( 

1070 "tagger", "Returns the name of the person who created this tag" 

1071 ) 

1072 tag_time = serializable_property( 

1073 "tag_time", 

1074 "The creation timestamp of the tag. As the number of seconds since the epoch", 

1075 ) 

1076 tag_timezone = serializable_property( 

1077 "tag_timezone", "The timezone that tag_time is in." 

1078 ) 

1079 message = serializable_property("message", "the message attached to this tag") 

1080 

1081 signature = serializable_property("signature", "Optional detached GPG signature") 

1082 

1083 def sign(self, keyid: str | None = None) -> None: 

1084 """Sign this tag with a GPG key. 

1085 

1086 Args: 

1087 keyid: Optional GPG key ID to use for signing. If not specified, 

1088 the default GPG key will be used. 

1089 """ 

1090 import gpg 

1091 

1092 with gpg.Context(armor=True) as c: 

1093 if keyid is not None: 

1094 key = c.get_key(keyid) 

1095 with gpg.Context(armor=True, signers=[key]) as ctx: 

1096 self.signature, _unused_result = ctx.sign( 

1097 self.as_raw_string(), 

1098 mode=gpg.constants.sig.mode.DETACH, 

1099 ) 

1100 else: 

1101 self.signature, _unused_result = c.sign( 

1102 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1103 ) 

1104 

1105 def raw_without_sig(self) -> bytes: 

1106 """Return raw string serialization without the GPG/SSH signature. 

1107 

1108 self.signature is a signature for the returned raw byte string serialization. 

1109 """ 

1110 ret = self.as_raw_string() 

1111 if self._signature: 

1112 ret = ret[: -len(self._signature)] 

1113 return ret 

1114 

1115 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

1116 """Extract the payload, signature, and signature type from this tag. 

1117 

1118 Returns: 

1119 Tuple of (``payload``, ``signature``, ``signature_type``) where: 

1120 

1121 - ``payload``: The raw tag data without the signature 

1122 - ``signature``: The signature bytes if present, None otherwise 

1123 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1124 

1125 Raises: 

1126 ObjectFormatException: If signature has unknown format 

1127 """ 

1128 if self._signature is None: 

1129 return self.as_raw_string(), None, None 

1130 

1131 payload = self.raw_without_sig() 

1132 

1133 # Determine signature type 

1134 if self._signature.startswith(BEGIN_PGP_SIGNATURE): 

1135 sig_type = SIGNATURE_PGP 

1136 elif self._signature.startswith(BEGIN_SSH_SIGNATURE): 

1137 sig_type = SIGNATURE_SSH 

1138 else: 

1139 raise ObjectFormatException("Unknown signature format") 

1140 

1141 return payload, self._signature, sig_type 

1142 

1143 def verify(self, keyids: Iterable[str] | None = None) -> None: 

1144 """Verify GPG signature for this tag (if it is signed). 

1145 

1146 Args: 

1147 keyids: Optional iterable of trusted keyids for this tag. 

1148 If this tag is not signed by any key in keyids verification will 

1149 fail. If not specified, this function only verifies that the tag 

1150 has a valid signature. 

1151 

1152 Raises: 

1153 gpg.errors.BadSignatures: if GPG signature verification fails 

1154 gpg.errors.MissingSignatures: if tag was not signed by a key 

1155 specified in keyids 

1156 """ 

1157 if self._signature is None: 

1158 return 

1159 

1160 import gpg 

1161 

1162 with gpg.Context() as ctx: 

1163 data, result = ctx.verify( 

1164 self.raw_without_sig(), 

1165 signature=self._signature, 

1166 ) 

1167 if keyids: 

1168 keys = [ctx.get_key(key) for key in keyids] 

1169 for key in keys: 

1170 for subkey in key.subkeys: 

1171 for sig in result.signatures: 

1172 if subkey.can_sign and subkey.fpr == sig.fpr: 

1173 return 

1174 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1175 

1176 

1177class TreeEntry(NamedTuple): 

1178 """Named tuple encapsulating a single tree entry.""" 

1179 

1180 path: bytes 

1181 mode: int 

1182 sha: bytes 

1183 

1184 def in_path(self, path: bytes) -> "TreeEntry": 

1185 """Return a copy of this entry with the given path prepended.""" 

1186 if not isinstance(self.path, bytes): 

1187 raise TypeError(f"Expected bytes for path, got {path!r}") 

1188 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1189 

1190 

1191def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]: 

1192 """Parse a tree text. 

1193 

1194 Args: 

1195 text: Serialized text to parse 

1196 strict: If True, enforce strict validation 

1197 Returns: iterator of tuples of (name, mode, sha) 

1198 

1199 Raises: 

1200 ObjectFormatException: if the object was malformed in some way 

1201 """ 

1202 count = 0 

1203 length = len(text) 

1204 while count < length: 

1205 mode_end = text.index(b" ", count) 

1206 mode_text = text[count:mode_end] 

1207 if strict and mode_text.startswith(b"0"): 

1208 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1209 try: 

1210 mode = int(mode_text, 8) 

1211 except ValueError as exc: 

1212 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1213 name_end = text.index(b"\0", mode_end) 

1214 name = text[mode_end + 1 : name_end] 

1215 count = name_end + 21 

1216 sha = text[name_end + 1 : count] 

1217 if len(sha) != 20: 

1218 raise ObjectFormatException("Sha has invalid length") 

1219 hexsha = sha_to_hex(sha) 

1220 yield (name, mode, hexsha) 

1221 

1222 

1223def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]: 

1224 """Serialize the items in a tree to a text. 

1225 

1226 Args: 

1227 items: Sorted iterable over (name, mode, sha) tuples 

1228 Returns: Serialized tree text as chunks 

1229 """ 

1230 for name, mode, hexsha in items: 

1231 yield ( 

1232 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1233 ) 

1234 

1235 

1236def sorted_tree_items( 

1237 entries: dict[bytes, tuple[int, bytes]], name_order: bool 

1238) -> Iterator[TreeEntry]: 

1239 """Iterate over a tree entries dictionary. 

1240 

1241 Args: 

1242 name_order: If True, iterate entries in order of their name. If 

1243 False, iterate entries in tree order, that is, treat subtree entries as 

1244 having '/' appended. 

1245 entries: Dictionary mapping names to (mode, sha) tuples 

1246 Returns: Iterator over (name, mode, hexsha) 

1247 """ 

1248 if name_order: 

1249 key_func = key_entry_name_order 

1250 else: 

1251 key_func = key_entry 

1252 for name, entry in sorted(entries.items(), key=key_func): 

1253 mode, hexsha = entry 

1254 # Stricter type checks than normal to mirror checks in the Rust version. 

1255 mode = int(mode) 

1256 if not isinstance(hexsha, bytes): 

1257 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1258 yield TreeEntry(name, mode, hexsha) 

1259 

1260 

1261def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1262 """Sort key for tree entry. 

1263 

1264 Args: 

1265 entry: (name, value) tuple 

1266 """ 

1267 (name, (mode, _sha)) = entry 

1268 if stat.S_ISDIR(mode): 

1269 name += b"/" 

1270 return name 

1271 

1272 

1273def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1274 """Sort key for tree entry in name order.""" 

1275 return entry[0] 

1276 

1277 

1278def pretty_format_tree_entry( 

1279 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8" 

1280) -> str: 

1281 """Pretty format tree entry. 

1282 

1283 Args: 

1284 name: Name of the directory entry 

1285 mode: Mode of entry 

1286 hexsha: Hexsha of the referenced object 

1287 encoding: Character encoding for the name 

1288 Returns: string describing the tree entry 

1289 """ 

1290 if mode & stat.S_IFDIR: 

1291 kind = "tree" 

1292 else: 

1293 kind = "blob" 

1294 return "{:04o} {} {}\t{}\n".format( 

1295 mode, 

1296 kind, 

1297 hexsha.decode("ascii"), 

1298 name.decode(encoding, "replace"), 

1299 ) 

1300 

1301 

1302class SubmoduleEncountered(Exception): 

1303 """A submodule was encountered while resolving a path.""" 

1304 

1305 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1306 """Initialize SubmoduleEncountered exception. 

1307 

1308 Args: 

1309 path: Path where the submodule was encountered 

1310 sha: SHA of the submodule 

1311 """ 

1312 self.path = path 

1313 self.sha = sha 

1314 

1315 

1316class Tree(ShaFile): 

1317 """A Git tree object.""" 

1318 

1319 type_name = b"tree" 

1320 type_num = 2 

1321 

1322 __slots__ = "_entries" 

1323 

1324 def __init__(self) -> None: 

1325 """Initialize an empty Tree.""" 

1326 super().__init__() 

1327 self._entries: dict[bytes, tuple[int, bytes]] = {} 

1328 

1329 @classmethod 

1330 def from_path(cls, filename: str | bytes) -> "Tree": 

1331 """Read a tree from a file on disk. 

1332 

1333 Args: 

1334 filename: Path to the tree file 

1335 

1336 Returns: 

1337 A Tree object 

1338 

1339 Raises: 

1340 NotTreeError: If the file is not a tree 

1341 """ 

1342 tree = ShaFile.from_path(filename) 

1343 if not isinstance(tree, cls): 

1344 raise NotTreeError(_path_to_bytes(filename)) 

1345 return tree 

1346 

1347 def __contains__(self, name: bytes) -> bool: 

1348 """Check if name exists in tree.""" 

1349 return name in self._entries 

1350 

1351 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1352 """Get tree entry by name.""" 

1353 return self._entries[name] 

1354 

1355 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1356 """Set a tree entry by name. 

1357 

1358 Args: 

1359 name: The name of the entry, as a string. 

1360 value: A tuple of (mode, hexsha), where mode is the mode of the 

1361 entry as an integral type and hexsha is the hex SHA of the entry as 

1362 a string. 

1363 """ 

1364 mode, hexsha = value 

1365 self._entries[name] = (mode, hexsha) 

1366 self._needs_serialization = True 

1367 

1368 def __delitem__(self, name: bytes) -> None: 

1369 """Delete tree entry by name.""" 

1370 del self._entries[name] 

1371 self._needs_serialization = True 

1372 

1373 def __len__(self) -> int: 

1374 """Return number of entries in tree.""" 

1375 return len(self._entries) 

1376 

1377 def __iter__(self) -> Iterator[bytes]: 

1378 """Iterate over tree entry names.""" 

1379 return iter(self._entries) 

1380 

1381 def add(self, name: bytes, mode: int, hexsha: bytes) -> None: 

1382 """Add an entry to the tree. 

1383 

1384 Args: 

1385 mode: The mode of the entry as an integral type. Not all 

1386 possible modes are supported by git; see check() for details. 

1387 name: The name of the entry, as a string. 

1388 hexsha: The hex SHA of the entry as a string. 

1389 """ 

1390 self._entries[name] = mode, hexsha 

1391 self._needs_serialization = True 

1392 

1393 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1394 """Iterate over entries. 

1395 

1396 Args: 

1397 name_order: If True, iterate in name order instead of tree 

1398 order. 

1399 Returns: Iterator over (name, mode, sha) tuples 

1400 """ 

1401 return sorted_tree_items(self._entries, name_order) 

1402 

1403 def items(self) -> list[TreeEntry]: 

1404 """Return the sorted entries in this tree. 

1405 

1406 Returns: List with (name, mode, sha) tuples 

1407 """ 

1408 return list(self.iteritems()) 

1409 

1410 def _deserialize(self, chunks: list[bytes]) -> None: 

1411 """Grab the entries in the tree.""" 

1412 try: 

1413 parsed_entries = parse_tree(b"".join(chunks)) 

1414 except ValueError as exc: 

1415 raise ObjectFormatException(exc) from exc 

1416 # TODO: list comprehension is for efficiency in the common (small) 

1417 # case; if memory efficiency in the large case is a concern, use a 

1418 # genexp. 

1419 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1420 

1421 def check(self) -> None: 

1422 """Check this object for internal consistency. 

1423 

1424 Raises: 

1425 ObjectFormatException: if the object is malformed in some way 

1426 """ 

1427 super().check() 

1428 assert self._chunked_text is not None 

1429 last = None 

1430 allowed_modes = ( 

1431 stat.S_IFREG | 0o755, 

1432 stat.S_IFREG | 0o644, 

1433 stat.S_IFLNK, 

1434 stat.S_IFDIR, 

1435 S_IFGITLINK, 

1436 # TODO: optionally exclude as in git fsck --strict 

1437 stat.S_IFREG | 0o664, 

1438 ) 

1439 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1440 check_hexsha(sha, f"invalid sha {sha!r}") 

1441 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1442 raise ObjectFormatException( 

1443 "invalid name {}".format(name.decode("utf-8", "replace")) 

1444 ) 

1445 

1446 if mode not in allowed_modes: 

1447 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1448 

1449 entry = (name, (mode, sha)) 

1450 if last: 

1451 if key_entry(last) > key_entry(entry): 

1452 raise ObjectFormatException("entries not sorted") 

1453 if name == last[0]: 

1454 raise ObjectFormatException(f"duplicate entry {name!r}") 

1455 last = entry 

1456 

1457 def _serialize(self) -> list[bytes]: 

1458 return list(serialize_tree(self.iteritems())) 

1459 

1460 def as_pretty_string(self) -> str: 

1461 """Return a human-readable string representation of this tree. 

1462 

1463 Returns: 

1464 Pretty-printed tree entries 

1465 """ 

1466 text: list[str] = [] 

1467 for entry in self.iteritems(): 

1468 if ( 

1469 entry.path is not None 

1470 and entry.mode is not None 

1471 and entry.sha is not None 

1472 ): 

1473 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha)) 

1474 return "".join(text) 

1475 

1476 def lookup_path( 

1477 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1478 ) -> tuple[int, ObjectID]: 

1479 """Look up an object in a Git tree. 

1480 

1481 Args: 

1482 lookup_obj: Callback for retrieving object by SHA1 

1483 path: Path to lookup 

1484 Returns: A tuple of (mode, SHA) of the resulting path. 

1485 """ 

1486 # Handle empty path - return the tree itself 

1487 if not path: 

1488 return stat.S_IFDIR, self.id 

1489 

1490 parts = path.split(b"/") 

1491 sha = self.id 

1492 mode: int | None = None 

1493 for i, p in enumerate(parts): 

1494 if not p: 

1495 continue 

1496 if mode is not None and S_ISGITLINK(mode): 

1497 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1498 obj = lookup_obj(sha) 

1499 if not isinstance(obj, Tree): 

1500 raise NotTreeError(sha) 

1501 mode, sha = obj[p] 

1502 if mode is None: 

1503 raise ValueError("No valid path found") 

1504 return mode, sha 

1505 

1506 

1507def parse_timezone(text: bytes) -> tuple[int, bool]: 

1508 """Parse a timezone text fragment (e.g. '+0100'). 

1509 

1510 Args: 

1511 text: Text to parse. 

1512 Returns: Tuple with timezone as seconds difference to UTC 

1513 and a boolean indicating whether this was a UTC timezone 

1514 prefixed with a negative sign (-0000). 

1515 """ 

1516 # cgit parses the first character as the sign, and the rest 

1517 # as an integer (using strtol), which could also be negative. 

1518 # We do the same for compatibility. See #697828. 

1519 if text[0] not in b"+-": 

1520 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1521 sign = text[:1] 

1522 offset = int(text[1:]) 

1523 if sign == b"-": 

1524 offset = -offset 

1525 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1526 signum = ((offset < 0) and -1) or 1 

1527 offset = abs(offset) 

1528 hours = int(offset / 100) 

1529 minutes = offset % 100 

1530 return ( 

1531 signum * (hours * 3600 + minutes * 60), 

1532 unnecessary_negative_timezone, 

1533 ) 

1534 

1535 

1536def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1537 """Format a timezone for Git serialization. 

1538 

1539 Args: 

1540 offset: Timezone offset as seconds difference to UTC 

1541 unnecessary_negative_timezone: Whether to use a minus sign for 

1542 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1543 """ 

1544 if offset % 60 != 0: 

1545 raise ValueError("Unable to handle non-minute offset.") 

1546 if offset < 0 or unnecessary_negative_timezone: 

1547 sign = "-" 

1548 offset = -offset 

1549 else: 

1550 sign = "+" 

1551 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1552 

1553 

1554def parse_time_entry( 

1555 value: bytes, 

1556) -> tuple[bytes, int | None, tuple[int | None, bool]]: 

1557 """Parse event. 

1558 

1559 Args: 

1560 value: Bytes representing a git commit/tag line 

1561 Raises: 

1562 ObjectFormatException in case of parsing error (malformed 

1563 field date) 

1564 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1565 """ 

1566 try: 

1567 sep = value.rindex(b"> ") 

1568 except ValueError: 

1569 return (value, None, (None, False)) 

1570 try: 

1571 person = value[0 : sep + 1] 

1572 rest = value[sep + 2 :] 

1573 timetext, timezonetext = rest.rsplit(b" ", 1) 

1574 time = int(timetext) 

1575 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1576 except ValueError as exc: 

1577 raise ObjectFormatException(exc) from exc 

1578 return person, time, (timezone, timezone_neg_utc) 

1579 

1580 

1581def format_time_entry( 

1582 person: bytes, time: int, timezone_info: tuple[int, bool] 

1583) -> bytes: 

1584 """Format an event.""" 

1585 (timezone, timezone_neg_utc) = timezone_info 

1586 return b" ".join( 

1587 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1588 ) 

1589 

1590 

1591@replace_me(since="0.21.0", remove_in="0.24.0") 

1592def parse_commit( 

1593 chunks: Iterable[bytes], 

1594) -> tuple[ 

1595 bytes | None, 

1596 list[bytes], 

1597 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1598 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1599 bytes | None, 

1600 list[Tag], 

1601 bytes | None, 

1602 bytes | None, 

1603 list[tuple[bytes, bytes]], 

1604]: 

1605 """Parse a commit object from chunks. 

1606 

1607 Args: 

1608 chunks: Chunks to parse 

1609 Returns: Tuple of (tree, parents, author_info, commit_info, 

1610 encoding, mergetag, gpgsig, message, extra) 

1611 """ 

1612 parents = [] 

1613 extra = [] 

1614 tree = None 

1615 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1616 None, 

1617 None, 

1618 (None, None), 

1619 ) 

1620 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1621 None, 

1622 None, 

1623 (None, None), 

1624 ) 

1625 encoding = None 

1626 mergetag = [] 

1627 message = None 

1628 gpgsig = None 

1629 

1630 for field, value in _parse_message(chunks): 

1631 # TODO(jelmer): Enforce ordering 

1632 if field == _TREE_HEADER: 

1633 tree = value 

1634 elif field == _PARENT_HEADER: 

1635 if value is None: 

1636 raise ObjectFormatException("missing parent value") 

1637 parents.append(value) 

1638 elif field == _AUTHOR_HEADER: 

1639 if value is None: 

1640 raise ObjectFormatException("missing author value") 

1641 author_info = parse_time_entry(value) 

1642 elif field == _COMMITTER_HEADER: 

1643 if value is None: 

1644 raise ObjectFormatException("missing committer value") 

1645 commit_info = parse_time_entry(value) 

1646 elif field == _ENCODING_HEADER: 

1647 encoding = value 

1648 elif field == _MERGETAG_HEADER: 

1649 if value is None: 

1650 raise ObjectFormatException("missing mergetag value") 

1651 tag = Tag.from_string(value + b"\n") 

1652 assert isinstance(tag, Tag) 

1653 mergetag.append(tag) 

1654 elif field == _GPGSIG_HEADER: 

1655 gpgsig = value 

1656 elif field is None: 

1657 message = value 

1658 else: 

1659 if value is None: 

1660 raise ObjectFormatException(f"missing value for field {field!r}") 

1661 extra.append((field, value)) 

1662 return ( 

1663 tree, 

1664 parents, 

1665 author_info, 

1666 commit_info, 

1667 encoding, 

1668 mergetag, 

1669 gpgsig, 

1670 message, 

1671 extra, 

1672 ) 

1673 

1674 

1675class Commit(ShaFile): 

1676 """A git commit object.""" 

1677 

1678 type_name = b"commit" 

1679 type_num = 1 

1680 

1681 __slots__ = ( 

1682 "_author", 

1683 "_author_time", 

1684 "_author_timezone", 

1685 "_author_timezone_neg_utc", 

1686 "_commit_time", 

1687 "_commit_timezone", 

1688 "_commit_timezone_neg_utc", 

1689 "_committer", 

1690 "_encoding", 

1691 "_extra", 

1692 "_gpgsig", 

1693 "_mergetag", 

1694 "_message", 

1695 "_parents", 

1696 "_tree", 

1697 ) 

1698 

1699 def __init__(self) -> None: 

1700 """Initialize an empty Commit.""" 

1701 super().__init__() 

1702 self._parents: list[bytes] = [] 

1703 self._encoding: bytes | None = None 

1704 self._mergetag: list[Tag] = [] 

1705 self._gpgsig: bytes | None = None 

1706 self._extra: list[tuple[bytes, bytes | None]] = [] 

1707 self._author_timezone_neg_utc: bool | None = False 

1708 self._commit_timezone_neg_utc: bool | None = False 

1709 

1710 @classmethod 

1711 def from_path(cls, path: str | bytes) -> "Commit": 

1712 """Read a commit from a file on disk. 

1713 

1714 Args: 

1715 path: Path to the commit file 

1716 

1717 Returns: 

1718 A Commit object 

1719 

1720 Raises: 

1721 NotCommitError: If the file is not a commit 

1722 """ 

1723 commit = ShaFile.from_path(path) 

1724 if not isinstance(commit, cls): 

1725 raise NotCommitError(_path_to_bytes(path)) 

1726 return commit 

1727 

1728 def _deserialize(self, chunks: list[bytes]) -> None: 

1729 self._parents = [] 

1730 self._extra = [] 

1731 self._tree = None 

1732 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1733 None, 

1734 None, 

1735 (None, None), 

1736 ) 

1737 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1738 None, 

1739 None, 

1740 (None, None), 

1741 ) 

1742 self._encoding = None 

1743 self._mergetag = [] 

1744 self._message = None 

1745 self._gpgsig = None 

1746 

1747 for field, value in _parse_message(chunks): 

1748 # TODO(jelmer): Enforce ordering 

1749 if field == _TREE_HEADER: 

1750 self._tree = value 

1751 elif field == _PARENT_HEADER: 

1752 assert value is not None 

1753 self._parents.append(value) 

1754 elif field == _AUTHOR_HEADER: 

1755 if value is None: 

1756 raise ObjectFormatException("missing author value") 

1757 author_info = parse_time_entry(value) 

1758 elif field == _COMMITTER_HEADER: 

1759 if value is None: 

1760 raise ObjectFormatException("missing committer value") 

1761 commit_info = parse_time_entry(value) 

1762 elif field == _ENCODING_HEADER: 

1763 self._encoding = value 

1764 elif field == _MERGETAG_HEADER: 

1765 assert value is not None 

1766 tag = Tag.from_string(value + b"\n") 

1767 assert isinstance(tag, Tag) 

1768 self._mergetag.append(tag) 

1769 elif field == _GPGSIG_HEADER: 

1770 self._gpgsig = value 

1771 elif field is None: 

1772 self._message = value 

1773 else: 

1774 self._extra.append((field, value)) 

1775 

1776 ( 

1777 self._author, 

1778 self._author_time, 

1779 (self._author_timezone, self._author_timezone_neg_utc), 

1780 ) = author_info 

1781 ( 

1782 self._committer, 

1783 self._commit_time, 

1784 (self._commit_timezone, self._commit_timezone_neg_utc), 

1785 ) = commit_info 

1786 

1787 def check(self) -> None: 

1788 """Check this object for internal consistency. 

1789 

1790 Raises: 

1791 ObjectFormatException: if the object is malformed in some way 

1792 """ 

1793 super().check() 

1794 assert self._chunked_text is not None 

1795 self._check_has_member("_tree", "missing tree") 

1796 self._check_has_member("_author", "missing author") 

1797 self._check_has_member("_committer", "missing committer") 

1798 self._check_has_member("_author_time", "missing author time") 

1799 self._check_has_member("_commit_time", "missing commit time") 

1800 

1801 for parent in self._parents: 

1802 check_hexsha(parent, "invalid parent sha") 

1803 assert self._tree is not None # checked by _check_has_member above 

1804 check_hexsha(self._tree, "invalid tree sha") 

1805 

1806 assert self._author is not None # checked by _check_has_member above 

1807 assert self._committer is not None # checked by _check_has_member above 

1808 check_identity(self._author, "invalid author") 

1809 check_identity(self._committer, "invalid committer") 

1810 

1811 assert self._author_time is not None # checked by _check_has_member above 

1812 assert self._commit_time is not None # checked by _check_has_member above 

1813 check_time(self._author_time) 

1814 check_time(self._commit_time) 

1815 

1816 last = None 

1817 for field, _ in _parse_message(self._chunked_text): 

1818 if field == _TREE_HEADER and last is not None: 

1819 raise ObjectFormatException("unexpected tree") 

1820 elif field == _PARENT_HEADER and last not in ( 

1821 _PARENT_HEADER, 

1822 _TREE_HEADER, 

1823 ): 

1824 raise ObjectFormatException("unexpected parent") 

1825 elif field == _AUTHOR_HEADER and last not in ( 

1826 _TREE_HEADER, 

1827 _PARENT_HEADER, 

1828 ): 

1829 raise ObjectFormatException("unexpected author") 

1830 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1831 raise ObjectFormatException("unexpected committer") 

1832 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1833 raise ObjectFormatException("unexpected encoding") 

1834 last = field 

1835 

1836 # TODO: optionally check for duplicate parents 

1837 

1838 def sign(self, keyid: str | None = None) -> None: 

1839 """Sign this commit with a GPG key. 

1840 

1841 Args: 

1842 keyid: Optional GPG key ID to use for signing. If not specified, 

1843 the default GPG key will be used. 

1844 """ 

1845 import gpg 

1846 

1847 with gpg.Context(armor=True) as c: 

1848 if keyid is not None: 

1849 key = c.get_key(keyid) 

1850 with gpg.Context(armor=True, signers=[key]) as ctx: 

1851 self.gpgsig, _unused_result = ctx.sign( 

1852 self.as_raw_string(), 

1853 mode=gpg.constants.sig.mode.DETACH, 

1854 ) 

1855 else: 

1856 self.gpgsig, _unused_result = c.sign( 

1857 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1858 ) 

1859 

1860 def raw_without_sig(self) -> bytes: 

1861 """Return raw string serialization without the GPG/SSH signature. 

1862 

1863 self.gpgsig is a signature for the returned raw byte string serialization. 

1864 """ 

1865 tmp = self.copy() 

1866 assert isinstance(tmp, Commit) 

1867 tmp._gpgsig = None 

1868 tmp.gpgsig = None 

1869 return tmp.as_raw_string() 

1870 

1871 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

1872 """Extract the payload, signature, and signature type from this commit. 

1873 

1874 Returns: 

1875 Tuple of (``payload``, ``signature``, ``signature_type``) where: 

1876 

1877 - ``payload``: The raw commit data without the signature 

1878 - ``signature``: The signature bytes if present, None otherwise 

1879 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1880 

1881 Raises: 

1882 ObjectFormatException: If signature has unknown format 

1883 """ 

1884 if self._gpgsig is None: 

1885 return self.as_raw_string(), None, None 

1886 

1887 payload = self.raw_without_sig() 

1888 

1889 # Determine signature type 

1890 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE): 

1891 sig_type = SIGNATURE_PGP 

1892 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE): 

1893 sig_type = SIGNATURE_SSH 

1894 else: 

1895 raise ObjectFormatException("Unknown signature format") 

1896 

1897 return payload, self._gpgsig, sig_type 

1898 

1899 def verify(self, keyids: Iterable[str] | None = None) -> None: 

1900 """Verify GPG signature for this commit (if it is signed). 

1901 

1902 Args: 

1903 keyids: Optional iterable of trusted keyids for this commit. 

1904 If this commit is not signed by any key in keyids verification will 

1905 fail. If not specified, this function only verifies that the commit 

1906 has a valid signature. 

1907 

1908 Raises: 

1909 gpg.errors.BadSignatures: if GPG signature verification fails 

1910 gpg.errors.MissingSignatures: if commit was not signed by a key 

1911 specified in keyids 

1912 """ 

1913 if self._gpgsig is None: 

1914 return 

1915 

1916 import gpg 

1917 

1918 with gpg.Context() as ctx: 

1919 data, result = ctx.verify( 

1920 self.raw_without_sig(), 

1921 signature=self._gpgsig, 

1922 ) 

1923 if keyids: 

1924 keys = [ctx.get_key(key) for key in keyids] 

1925 for key in keys: 

1926 for subkey in key.subkeys: 

1927 for sig in result.signatures: 

1928 if subkey.can_sign and subkey.fpr == sig.fpr: 

1929 return 

1930 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1931 

1932 def _serialize(self) -> list[bytes]: 

1933 headers = [] 

1934 assert self._tree is not None 

1935 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1936 headers.append((_TREE_HEADER, tree_bytes)) 

1937 for p in self._parents: 

1938 headers.append((_PARENT_HEADER, p)) 

1939 assert self._author is not None 

1940 assert self._author_time is not None 

1941 assert self._author_timezone is not None 

1942 assert self._author_timezone_neg_utc is not None 

1943 headers.append( 

1944 ( 

1945 _AUTHOR_HEADER, 

1946 format_time_entry( 

1947 self._author, 

1948 self._author_time, 

1949 (self._author_timezone, self._author_timezone_neg_utc), 

1950 ), 

1951 ) 

1952 ) 

1953 assert self._committer is not None 

1954 assert self._commit_time is not None 

1955 assert self._commit_timezone is not None 

1956 assert self._commit_timezone_neg_utc is not None 

1957 headers.append( 

1958 ( 

1959 _COMMITTER_HEADER, 

1960 format_time_entry( 

1961 self._committer, 

1962 self._commit_time, 

1963 (self._commit_timezone, self._commit_timezone_neg_utc), 

1964 ), 

1965 ) 

1966 ) 

1967 if self.encoding: 

1968 headers.append((_ENCODING_HEADER, self.encoding)) 

1969 for mergetag in self.mergetag: 

1970 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

1971 headers.extend( 

1972 (field, value) for field, value in self._extra if value is not None 

1973 ) 

1974 if self.gpgsig: 

1975 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

1976 return list(_format_message(headers, self._message)) 

1977 

1978 tree = serializable_property("tree", "Tree that is the state of this commit") 

1979 

1980 def _get_parents(self) -> list[bytes]: 

1981 """Return a list of parents of this commit.""" 

1982 return self._parents 

1983 

1984 def _set_parents(self, value: list[bytes]) -> None: 

1985 """Set a list of parents of this commit.""" 

1986 self._needs_serialization = True 

1987 self._parents = value 

1988 

1989 parents = property( 

1990 _get_parents, 

1991 _set_parents, 

1992 doc="Parents of this commit, by their SHA1.", 

1993 ) 

1994 

1995 @replace_me(since="0.21.0", remove_in="0.24.0") 

1996 def _get_extra(self) -> list[tuple[bytes, bytes | None]]: 

1997 """Return extra settings of this commit.""" 

1998 return self._extra 

1999 

2000 extra = property( 

2001 _get_extra, 

2002 doc="Extra header fields not understood (presumably added in a " 

2003 "newer version of git). Kept verbatim so the object can " 

2004 "be correctly reserialized. For private commit metadata, use " 

2005 "pseudo-headers in Commit.message, rather than this field.", 

2006 ) 

2007 

2008 author = serializable_property("author", "The name of the author of the commit") 

2009 

2010 committer = serializable_property( 

2011 "committer", "The name of the committer of the commit" 

2012 ) 

2013 

2014 message = serializable_property("message", "The commit message") 

2015 

2016 commit_time = serializable_property( 

2017 "commit_time", 

2018 "The timestamp of the commit. As the number of seconds since the epoch.", 

2019 ) 

2020 

2021 commit_timezone = serializable_property( 

2022 "commit_timezone", "The zone the commit time is in" 

2023 ) 

2024 

2025 author_time = serializable_property( 

2026 "author_time", 

2027 "The timestamp the commit was written. As the number of " 

2028 "seconds since the epoch.", 

2029 ) 

2030 

2031 author_timezone = serializable_property( 

2032 "author_timezone", "Returns the zone the author time is in." 

2033 ) 

2034 

2035 encoding = serializable_property("encoding", "Encoding of the commit message.") 

2036 

2037 mergetag = serializable_property("mergetag", "Associated signed tag.") 

2038 

2039 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

2040 

2041 

2042OBJECT_CLASSES = ( 

2043 Commit, 

2044 Tree, 

2045 Blob, 

2046 Tag, 

2047) 

2048 

2049_TYPE_MAP: dict[bytes | int, type[ShaFile]] = {} 

2050 

2051for cls in OBJECT_CLASSES: 

2052 _TYPE_MAP[cls.type_name] = cls 

2053 _TYPE_MAP[cls.type_num] = cls 

2054 

2055 

2056# Hold on to the pure-python implementations for testing 

2057_parse_tree_py = parse_tree 

2058_sorted_tree_items_py = sorted_tree_items 

2059try: 

2060 # Try to import Rust versions 

2061 from dulwich._objects import ( 

2062 parse_tree as _parse_tree_rs, 

2063 ) 

2064 from dulwich._objects import ( 

2065 sorted_tree_items as _sorted_tree_items_rs, 

2066 ) 

2067except ImportError: 

2068 pass 

2069else: 

2070 parse_tree = _parse_tree_rs 

2071 sorted_tree_items = _sorted_tree_items_rs