Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1018 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25import binascii 

26import os 

27import posixpath 

28import stat 

29import sys 

30import zlib 

31from collections.abc import Callable, Iterable, Iterator, Sequence 

32from hashlib import sha1 

33from io import BufferedIOBase, BytesIO 

34from typing import ( 

35 IO, 

36 TYPE_CHECKING, 

37 NamedTuple, 

38 TypeVar, 

39) 

40 

41if sys.version_info >= (3, 11): 

42 from typing import Self 

43else: 

44 from typing_extensions import Self 

45 

46from typing import TypeGuard 

47 

48from . import replace_me 

49from .errors import ( 

50 ChecksumMismatch, 

51 FileFormatException, 

52 NotBlobError, 

53 NotCommitError, 

54 NotTagError, 

55 NotTreeError, 

56 ObjectFormatException, 

57) 

58from .file import GitFile 

59 

60if TYPE_CHECKING: 

61 from _hashlib import HASH 

62 

63 from .file import _GitFile 

64 

65ZERO_SHA = b"0" * 40 

66 

67# Header fields for commits 

68_TREE_HEADER = b"tree" 

69_PARENT_HEADER = b"parent" 

70_AUTHOR_HEADER = b"author" 

71_COMMITTER_HEADER = b"committer" 

72_ENCODING_HEADER = b"encoding" 

73_MERGETAG_HEADER = b"mergetag" 

74_GPGSIG_HEADER = b"gpgsig" 

75 

76# Header fields for objects 

77_OBJECT_HEADER = b"object" 

78_TYPE_HEADER = b"type" 

79_TAG_HEADER = b"tag" 

80_TAGGER_HEADER = b"tagger" 

81 

82 

83S_IFGITLINK = 0o160000 

84 

85 

86MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

87 

88BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

89BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----" 

90 

91# Signature type constants 

92SIGNATURE_PGP = b"pgp" 

93SIGNATURE_SSH = b"ssh" 

94 

95 

96ObjectID = bytes 

97 

98 

99class EmptyFileException(FileFormatException): 

100 """An unexpectedly empty file was encountered.""" 

101 

102 

103def S_ISGITLINK(m: int) -> bool: 

104 """Check if a mode indicates a submodule. 

105 

106 Args: 

107 m: Mode to check 

108 Returns: a ``boolean`` 

109 """ 

110 return stat.S_IFMT(m) == S_IFGITLINK 

111 

112 

113def _decompress(string: bytes) -> bytes: 

114 dcomp = zlib.decompressobj() 

115 dcomped = dcomp.decompress(string) 

116 dcomped += dcomp.flush() 

117 return dcomped 

118 

119 

120def sha_to_hex(sha: ObjectID) -> bytes: 

121 """Takes a string and returns the hex of the sha within.""" 

122 hexsha = binascii.hexlify(sha) 

123 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

124 return hexsha 

125 

126 

127def hex_to_sha(hex: bytes | str) -> bytes: 

128 """Takes a hex sha and returns a binary sha.""" 

129 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}" 

130 try: 

131 return binascii.unhexlify(hex) 

132 except TypeError as exc: 

133 if not isinstance(hex, bytes): 

134 raise 

135 raise ValueError(exc.args[0]) from exc 

136 

137 

138def valid_hexsha(hex: bytes | str) -> bool: 

139 """Check if a string is a valid hex SHA. 

140 

141 Args: 

142 hex: Hex string to check 

143 

144 Returns: 

145 True if valid hex SHA, False otherwise 

146 """ 

147 if len(hex) != 40: 

148 return False 

149 try: 

150 binascii.unhexlify(hex) 

151 except (TypeError, binascii.Error): 

152 return False 

153 else: 

154 return True 

155 

156 

157PathT = TypeVar("PathT", str, bytes) 

158 

159 

160def hex_to_filename(path: PathT, hex: str | bytes) -> PathT: 

161 """Takes a hex sha and returns its filename relative to the given path.""" 

162 # os.path.join accepts bytes or unicode, but all args must be of the same 

163 # type. Make sure that hex which is expected to be bytes, is the same type 

164 # as path. 

165 if isinstance(path, str): 

166 if isinstance(hex, bytes): 

167 hex_str = hex.decode("ascii") 

168 else: 

169 hex_str = hex 

170 dir_name = hex_str[:2] 

171 file_name = hex_str[2:] 

172 result = os.path.join(path, dir_name, file_name) 

173 assert isinstance(result, str) 

174 return result 

175 else: 

176 # path is bytes 

177 if isinstance(hex, str): 

178 hex_bytes = hex.encode("ascii") 

179 else: 

180 hex_bytes = hex 

181 dir_name_b = hex_bytes[:2] 

182 file_name_b = hex_bytes[2:] 

183 result_b = os.path.join(path, dir_name_b, file_name_b) 

184 assert isinstance(result_b, bytes) 

185 return result_b 

186 

187 

188def filename_to_hex(filename: str | bytes) -> str: 

189 """Takes an object filename and returns its corresponding hex sha.""" 

190 # grab the last (up to) two path components 

191 errmsg = f"Invalid object filename: {filename!r}" 

192 if isinstance(filename, str): 

193 names = filename.rsplit(os.path.sep, 2)[-2:] 

194 assert len(names) == 2, errmsg 

195 base, rest = names 

196 assert len(base) == 2 and len(rest) == 38, errmsg 

197 hex_str = base + rest 

198 hex_bytes = hex_str.encode("ascii") 

199 else: 

200 # filename is bytes 

201 sep = ( 

202 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep 

203 ) 

204 names_b = filename.rsplit(sep, 2)[-2:] 

205 assert len(names_b) == 2, errmsg 

206 base_b, rest_b = names_b 

207 assert len(base_b) == 2 and len(rest_b) == 38, errmsg 

208 hex_bytes = base_b + rest_b 

209 hex_to_sha(hex_bytes) 

210 return hex_bytes.decode("ascii") 

211 

212 

213def object_header(num_type: int, length: int) -> bytes: 

214 """Return an object header for the given numeric type and text length.""" 

215 cls = object_class(num_type) 

216 if cls is None: 

217 raise AssertionError(f"unsupported class type num: {num_type}") 

218 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

219 

220 

221def serializable_property(name: str, docstring: str | None = None) -> property: 

222 """A property that helps tracking whether serialization is necessary.""" 

223 

224 def set(obj: "ShaFile", value: object) -> None: 

225 """Set the property value and mark the object as needing serialization. 

226 

227 Args: 

228 obj: The ShaFile object 

229 value: The value to set 

230 """ 

231 setattr(obj, "_" + name, value) 

232 obj._needs_serialization = True 

233 

234 def get(obj: "ShaFile") -> object: 

235 """Get the property value. 

236 

237 Args: 

238 obj: The ShaFile object 

239 

240 Returns: 

241 The property value 

242 """ 

243 return getattr(obj, "_" + name) 

244 

245 return property(get, set, doc=docstring) 

246 

247 

248def object_class(type: bytes | int) -> type["ShaFile"] | None: 

249 """Get the object class corresponding to the given type. 

250 

251 Args: 

252 type: Either a type name string or a numeric type. 

253 Returns: The ShaFile subclass corresponding to the given type, or None if 

254 type is not a valid type name/number. 

255 """ 

256 return _TYPE_MAP.get(type, None) 

257 

258 

259def check_hexsha(hex: str | bytes, error_msg: str) -> None: 

260 """Check if a string is a valid hex sha string. 

261 

262 Args: 

263 hex: Hex string to check 

264 error_msg: Error message to use in exception 

265 Raises: 

266 ObjectFormatException: Raised when the string is not valid 

267 """ 

268 if not valid_hexsha(hex): 

269 raise ObjectFormatException(f"{error_msg} {hex!r}") 

270 

271 

272def check_identity(identity: bytes | None, error_msg: str) -> None: 

273 """Check if the specified identity is valid. 

274 

275 This will raise an exception if the identity is not valid. 

276 

277 Args: 

278 identity: Identity string 

279 error_msg: Error message to use in exception 

280 """ 

281 if identity is None: 

282 raise ObjectFormatException(error_msg) 

283 email_start = identity.find(b"<") 

284 email_end = identity.find(b">") 

285 if not all( 

286 [ 

287 email_start >= 1, 

288 identity[email_start - 1] == b" "[0], 

289 identity.find(b"<", email_start + 1) == -1, 

290 email_end == len(identity) - 1, 

291 b"\0" not in identity, 

292 b"\n" not in identity, 

293 ] 

294 ): 

295 raise ObjectFormatException(error_msg) 

296 

297 

298def _path_to_bytes(path: str | bytes) -> bytes: 

299 """Convert a path to bytes for use in error messages.""" 

300 if isinstance(path, str): 

301 return path.encode("utf-8", "surrogateescape") 

302 return path 

303 

304 

305def check_time(time_seconds: int) -> None: 

306 """Check if the specified time is not prone to overflow error. 

307 

308 This will raise an exception if the time is not valid. 

309 

310 Args: 

311 time_seconds: time in seconds 

312 

313 """ 

314 # Prevent overflow error 

315 if time_seconds > MAX_TIME: 

316 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

317 

318 

319def git_line(*items: bytes) -> bytes: 

320 """Formats items into a space separated line.""" 

321 return b" ".join(items) + b"\n" 

322 

323 

324class FixedSha: 

325 """SHA object that behaves like hashlib's but is given a fixed value.""" 

326 

327 __slots__ = ("_hexsha", "_sha") 

328 

329 def __init__(self, hexsha: str | bytes) -> None: 

330 """Initialize FixedSha with a fixed SHA value. 

331 

332 Args: 

333 hexsha: Hex SHA value as string or bytes 

334 """ 

335 if isinstance(hexsha, str): 

336 hexsha = hexsha.encode("ascii") 

337 if not isinstance(hexsha, bytes): 

338 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

339 self._hexsha = hexsha 

340 self._sha = hex_to_sha(hexsha) 

341 

342 def digest(self) -> bytes: 

343 """Return the raw SHA digest.""" 

344 return self._sha 

345 

346 def hexdigest(self) -> str: 

347 """Return the hex SHA digest.""" 

348 return self._hexsha.decode("ascii") 

349 

350 

351# Type guard functions for runtime type narrowing 

352if TYPE_CHECKING: 

353 

354 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

355 """Check if a ShaFile is a Commit.""" 

356 return obj.type_name == b"commit" 

357 

358 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

359 """Check if a ShaFile is a Tree.""" 

360 return obj.type_name == b"tree" 

361 

362 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

363 """Check if a ShaFile is a Blob.""" 

364 return obj.type_name == b"blob" 

365 

366 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

367 """Check if a ShaFile is a Tag.""" 

368 return obj.type_name == b"tag" 

369else: 

370 # Runtime versions without type narrowing 

371 def is_commit(obj: "ShaFile") -> bool: 

372 """Check if a ShaFile is a Commit.""" 

373 return obj.type_name == b"commit" 

374 

375 def is_tree(obj: "ShaFile") -> bool: 

376 """Check if a ShaFile is a Tree.""" 

377 return obj.type_name == b"tree" 

378 

379 def is_blob(obj: "ShaFile") -> bool: 

380 """Check if a ShaFile is a Blob.""" 

381 return obj.type_name == b"blob" 

382 

383 def is_tag(obj: "ShaFile") -> bool: 

384 """Check if a ShaFile is a Tag.""" 

385 return obj.type_name == b"tag" 

386 

387 

388class ShaFile: 

389 """A git SHA file.""" 

390 

391 __slots__ = ("_chunked_text", "_needs_serialization", "_sha") 

392 

393 _needs_serialization: bool 

394 type_name: bytes 

395 type_num: int 

396 _chunked_text: list[bytes] | None 

397 _sha: "FixedSha | None | HASH" 

398 

399 @staticmethod 

400 def _parse_legacy_object_header( 

401 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile" 

402 ) -> "ShaFile": 

403 """Parse a legacy object, creating it but not reading the file.""" 

404 bufsize = 1024 

405 decomp = zlib.decompressobj() 

406 header = decomp.decompress(magic) 

407 start = 0 

408 end = -1 

409 while end < 0: 

410 extra = f.read(bufsize) 

411 header += decomp.decompress(extra) 

412 magic += extra 

413 end = header.find(b"\0", start) 

414 start = len(header) 

415 header = header[:end] 

416 type_name, size = header.split(b" ", 1) 

417 try: 

418 int(size) # sanity check 

419 except ValueError as exc: 

420 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

421 obj_class = object_class(type_name) 

422 if not obj_class: 

423 raise ObjectFormatException( 

424 "Not a known type: {}".format(type_name.decode("ascii")) 

425 ) 

426 return obj_class() 

427 

428 def _parse_legacy_object(self, map: bytes) -> None: 

429 """Parse a legacy object, setting the raw string.""" 

430 text = _decompress(map) 

431 header_end = text.find(b"\0") 

432 if header_end < 0: 

433 raise ObjectFormatException("Invalid object header, no \\0") 

434 self.set_raw_string(text[header_end + 1 :]) 

435 

436 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

437 """Return chunks representing the object in the experimental format. 

438 

439 Returns: List of strings 

440 """ 

441 compobj = zlib.compressobj(compression_level) 

442 yield compobj.compress(self._header()) 

443 for chunk in self.as_raw_chunks(): 

444 yield compobj.compress(chunk) 

445 yield compobj.flush() 

446 

447 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

448 """Return string representing the object in the experimental format.""" 

449 return b"".join( 

450 self.as_legacy_object_chunks(compression_level=compression_level) 

451 ) 

452 

453 def as_raw_chunks(self) -> list[bytes]: 

454 """Return chunks with serialization of the object. 

455 

456 Returns: List of strings, not necessarily one per line 

457 """ 

458 if self._needs_serialization: 

459 self._sha = None 

460 self._chunked_text = self._serialize() 

461 self._needs_serialization = False 

462 assert self._chunked_text is not None 

463 return self._chunked_text 

464 

465 def as_raw_string(self) -> bytes: 

466 """Return raw string with serialization of the object. 

467 

468 Returns: String object 

469 """ 

470 return b"".join(self.as_raw_chunks()) 

471 

472 def __bytes__(self) -> bytes: 

473 """Return raw string serialization of this object.""" 

474 return self.as_raw_string() 

475 

476 def __hash__(self) -> int: 

477 """Return unique hash for this object.""" 

478 return hash(self.id) 

479 

480 def as_pretty_string(self) -> str: 

481 """Return a string representing this object, fit for display.""" 

482 return self.as_raw_string().decode("utf-8", "replace") 

483 

484 def set_raw_string(self, text: bytes, sha: ObjectID | None = None) -> None: 

485 """Set the contents of this object from a serialized string.""" 

486 if not isinstance(text, bytes): 

487 raise TypeError(f"Expected bytes for text, got {text!r}") 

488 self.set_raw_chunks([text], sha) 

489 

490 def set_raw_chunks(self, chunks: list[bytes], sha: ObjectID | None = None) -> None: 

491 """Set the contents of this object from a list of chunks.""" 

492 self._chunked_text = chunks 

493 self._deserialize(chunks) 

494 if sha is None: 

495 self._sha = None 

496 else: 

497 self._sha = FixedSha(sha) 

498 self._needs_serialization = False 

499 

500 @staticmethod 

501 def _parse_object_header( 

502 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile" 

503 ) -> "ShaFile": 

504 """Parse a new style object, creating it but not reading the file.""" 

505 num_type = (ord(magic[0:1]) >> 4) & 7 

506 obj_class = object_class(num_type) 

507 if not obj_class: 

508 raise ObjectFormatException(f"Not a known type {num_type}") 

509 return obj_class() 

510 

511 def _parse_object(self, map: bytes) -> None: 

512 """Parse a new style object, setting self._text.""" 

513 # skip type and size; type must have already been determined, and 

514 # we trust zlib to fail if it's otherwise corrupted 

515 byte = ord(map[0:1]) 

516 used = 1 

517 while (byte & 0x80) != 0: 

518 byte = ord(map[used : used + 1]) 

519 used += 1 

520 raw = map[used:] 

521 self.set_raw_string(_decompress(raw)) 

522 

523 @classmethod 

524 def _is_legacy_object(cls, magic: bytes) -> bool: 

525 b0 = ord(magic[0:1]) 

526 b1 = ord(magic[1:2]) 

527 word = (b0 << 8) + b1 

528 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

529 

530 @classmethod 

531 def _parse_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile": 

532 map = f.read() 

533 if not map: 

534 raise EmptyFileException("Corrupted empty file detected") 

535 

536 if cls._is_legacy_object(map): 

537 obj = cls._parse_legacy_object_header(map, f) 

538 obj._parse_legacy_object(map) 

539 else: 

540 obj = cls._parse_object_header(map, f) 

541 obj._parse_object(map) 

542 return obj 

543 

544 def __init__(self) -> None: 

545 """Don't call this directly.""" 

546 self._sha = None 

547 self._chunked_text = [] 

548 self._needs_serialization = True 

549 

550 def _deserialize(self, chunks: list[bytes]) -> None: 

551 raise NotImplementedError(self._deserialize) 

552 

553 def _serialize(self) -> list[bytes]: 

554 raise NotImplementedError(self._serialize) 

555 

556 @classmethod 

557 def from_path(cls, path: str | bytes) -> "ShaFile": 

558 """Open a SHA file from disk.""" 

559 with GitFile(path, "rb") as f: 

560 return cls.from_file(f) 

561 

562 @classmethod 

563 def from_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile": 

564 """Get the contents of a SHA file on disk.""" 

565 try: 

566 obj = cls._parse_file(f) 

567 obj._sha = None 

568 return obj 

569 except (IndexError, ValueError) as exc: 

570 raise ObjectFormatException("invalid object header") from exc 

571 

572 @staticmethod 

573 def from_raw_string( 

574 type_num: int, string: bytes, sha: ObjectID | None = None 

575 ) -> "ShaFile": 

576 """Creates an object of the indicated type from the raw string given. 

577 

578 Args: 

579 type_num: The numeric type of the object. 

580 string: The raw uncompressed contents. 

581 sha: Optional known sha for the object 

582 """ 

583 cls = object_class(type_num) 

584 if cls is None: 

585 raise AssertionError(f"unsupported class type num: {type_num}") 

586 obj = cls() 

587 obj.set_raw_string(string, sha) 

588 return obj 

589 

590 @staticmethod 

591 def from_raw_chunks( 

592 type_num: int, chunks: list[bytes], sha: ObjectID | None = None 

593 ) -> "ShaFile": 

594 """Creates an object of the indicated type from the raw chunks given. 

595 

596 Args: 

597 type_num: The numeric type of the object. 

598 chunks: An iterable of the raw uncompressed contents. 

599 sha: Optional known sha for the object 

600 """ 

601 cls = object_class(type_num) 

602 if cls is None: 

603 raise AssertionError(f"unsupported class type num: {type_num}") 

604 obj = cls() 

605 obj.set_raw_chunks(chunks, sha) 

606 return obj 

607 

608 @classmethod 

609 def from_string(cls, string: bytes) -> Self: 

610 """Create a ShaFile from a string.""" 

611 obj = cls() 

612 obj.set_raw_string(string) 

613 return obj 

614 

615 def _check_has_member(self, member: str, error_msg: str) -> None: 

616 """Check that the object has a given member variable. 

617 

618 Args: 

619 member: the member variable to check for 

620 error_msg: the message for an error if the member is missing 

621 Raises: 

622 ObjectFormatException: with the given error_msg if member is 

623 missing or is None 

624 """ 

625 if getattr(self, member, None) is None: 

626 raise ObjectFormatException(error_msg) 

627 

628 def check(self) -> None: 

629 """Check this object for internal consistency. 

630 

631 Raises: 

632 ObjectFormatException: if the object is malformed in some way 

633 ChecksumMismatch: if the object was created with a SHA that does 

634 not match its contents 

635 """ 

636 # TODO: if we find that error-checking during object parsing is a 

637 # performance bottleneck, those checks should be moved to the class's 

638 # check() method during optimization so we can still check the object 

639 # when necessary. 

640 old_sha = self.id 

641 try: 

642 self._deserialize(self.as_raw_chunks()) 

643 self._sha = None 

644 new_sha = self.id 

645 except Exception as exc: 

646 raise ObjectFormatException(exc) from exc 

647 if old_sha != new_sha: 

648 raise ChecksumMismatch(new_sha, old_sha) 

649 

650 def _header(self) -> bytes: 

651 return object_header(self.type_num, self.raw_length()) 

652 

653 def raw_length(self) -> int: 

654 """Returns the length of the raw string of this object.""" 

655 return sum(map(len, self.as_raw_chunks())) 

656 

657 def sha(self) -> "FixedSha | HASH": 

658 """The SHA1 object that is the name of this object.""" 

659 if self._sha is None or self._needs_serialization: 

660 # this is a local because as_raw_chunks() overwrites self._sha 

661 new_sha = sha1() 

662 new_sha.update(self._header()) 

663 for chunk in self.as_raw_chunks(): 

664 new_sha.update(chunk) 

665 self._sha = new_sha 

666 return self._sha 

667 

668 def copy(self) -> "ShaFile": 

669 """Create a new copy of this SHA1 object from its raw string.""" 

670 obj_class = object_class(self.type_num) 

671 if obj_class is None: 

672 raise AssertionError(f"invalid type num {self.type_num}") 

673 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

674 

675 @property 

676 def id(self) -> bytes: 

677 """The hex SHA of this object.""" 

678 return self.sha().hexdigest().encode("ascii") 

679 

680 def __repr__(self) -> str: 

681 """Return string representation of this object.""" 

682 return f"<{self.__class__.__name__} {self.id!r}>" 

683 

684 def __ne__(self, other: object) -> bool: 

685 """Check whether this object does not match the other.""" 

686 return not isinstance(other, ShaFile) or self.id != other.id 

687 

688 def __eq__(self, other: object) -> bool: 

689 """Return True if the SHAs of the two objects match.""" 

690 return isinstance(other, ShaFile) and self.id == other.id 

691 

692 def __lt__(self, other: object) -> bool: 

693 """Return whether SHA of this object is less than the other.""" 

694 if not isinstance(other, ShaFile): 

695 raise TypeError 

696 return self.id < other.id 

697 

698 def __le__(self, other: object) -> bool: 

699 """Check whether SHA of this object is less than or equal to the other.""" 

700 if not isinstance(other, ShaFile): 

701 raise TypeError 

702 return self.id <= other.id 

703 

704 

705class Blob(ShaFile): 

706 """A Git Blob object.""" 

707 

708 __slots__ = () 

709 

710 type_name = b"blob" 

711 type_num = 3 

712 

713 _chunked_text: list[bytes] 

714 

715 def __init__(self) -> None: 

716 """Initialize a new Blob object.""" 

717 super().__init__() 

718 self._chunked_text = [] 

719 self._needs_serialization = False 

720 

721 def _get_data(self) -> bytes: 

722 return self.as_raw_string() 

723 

724 def _set_data(self, data: bytes) -> None: 

725 self.set_raw_string(data) 

726 

727 data = property( 

728 _get_data, _set_data, doc="The text contained within the blob object." 

729 ) 

730 

731 def _get_chunked(self) -> list[bytes]: 

732 return self._chunked_text 

733 

734 def _set_chunked(self, chunks: list[bytes]) -> None: 

735 self._chunked_text = chunks 

736 

737 def _serialize(self) -> list[bytes]: 

738 return self._chunked_text 

739 

740 def _deserialize(self, chunks: list[bytes]) -> None: 

741 self._chunked_text = chunks 

742 

743 chunked = property( 

744 _get_chunked, 

745 _set_chunked, 

746 doc="The text in the blob object, as chunks (not necessarily lines)", 

747 ) 

748 

749 @classmethod 

750 def from_path(cls, path: str | bytes) -> "Blob": 

751 """Read a blob from a file on disk. 

752 

753 Args: 

754 path: Path to the blob file 

755 

756 Returns: 

757 A Blob object 

758 

759 Raises: 

760 NotBlobError: If the file is not a blob 

761 """ 

762 blob = ShaFile.from_path(path) 

763 if not isinstance(blob, cls): 

764 raise NotBlobError(_path_to_bytes(path)) 

765 return blob 

766 

767 def check(self) -> None: 

768 """Check this object for internal consistency. 

769 

770 Raises: 

771 ObjectFormatException: if the object is malformed in some way 

772 """ 

773 super().check() 

774 

775 def splitlines(self) -> list[bytes]: 

776 """Return list of lines in this blob. 

777 

778 This preserves the original line endings. 

779 """ 

780 chunks = self.chunked 

781 if not chunks: 

782 return [] 

783 if len(chunks) == 1: 

784 result: list[bytes] = chunks[0].splitlines(True) 

785 return result 

786 remaining = None 

787 ret = [] 

788 for chunk in chunks: 

789 lines = chunk.splitlines(True) 

790 if len(lines) > 1: 

791 ret.append((remaining or b"") + lines[0]) 

792 ret.extend(lines[1:-1]) 

793 remaining = lines[-1] 

794 elif len(lines) == 1: 

795 if remaining is None: 

796 remaining = lines.pop() 

797 else: 

798 remaining += lines.pop() 

799 if remaining is not None: 

800 ret.append(remaining) 

801 return ret 

802 

803 

804def _parse_message( 

805 chunks: Iterable[bytes], 

806) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]: 

807 """Parse a message with a list of fields and a body. 

808 

809 Args: 

810 chunks: the raw chunks of the tag or commit object. 

811 Returns: iterator of tuples of (field, value), one per header line, in the 

812 order read from the text, possibly including duplicates. Includes a 

813 field named None for the freeform tag/commit text. 

814 """ 

815 f = BytesIO(b"".join(chunks)) 

816 k = None 

817 v = b"" 

818 eof = False 

819 

820 def _strip_last_newline(value: bytes) -> bytes: 

821 """Strip the last newline from value.""" 

822 if value and value.endswith(b"\n"): 

823 return value[:-1] 

824 return value 

825 

826 # Parse the headers 

827 # 

828 # Headers can contain newlines. The next line is indented with a space. 

829 # We store the latest key as 'k', and the accumulated value as 'v'. 

830 for line in f: 

831 if line.startswith(b" "): 

832 # Indented continuation of the previous line 

833 v += line[1:] 

834 else: 

835 if k is not None: 

836 # We parsed a new header, return its value 

837 yield (k, _strip_last_newline(v)) 

838 if line == b"\n": 

839 # Empty line indicates end of headers 

840 break 

841 (k, v) = line.split(b" ", 1) 

842 

843 else: 

844 # We reached end of file before the headers ended. We still need to 

845 # return the previous header, then we need to return a None field for 

846 # the text. 

847 eof = True 

848 if k is not None: 

849 yield (k, _strip_last_newline(v)) 

850 yield (None, None) 

851 

852 if not eof: 

853 # We didn't reach the end of file while parsing headers. We can return 

854 # the rest of the file as a message. 

855 yield (None, f.read()) 

856 

857 f.close() 

858 

859 

860def _format_message( 

861 headers: Sequence[tuple[bytes, bytes]], body: bytes | None 

862) -> Iterator[bytes]: 

863 for field, value in headers: 

864 lines = value.split(b"\n") 

865 yield git_line(field, lines[0]) 

866 for line in lines[1:]: 

867 yield b" " + line + b"\n" 

868 yield b"\n" # There must be a new line after the headers 

869 if body: 

870 yield body 

871 

872 

873class Tag(ShaFile): 

874 """A Git Tag object.""" 

875 

876 type_name = b"tag" 

877 type_num = 4 

878 

879 __slots__ = ( 

880 "_message", 

881 "_name", 

882 "_object_class", 

883 "_object_sha", 

884 "_signature", 

885 "_tag_time", 

886 "_tag_timezone", 

887 "_tag_timezone_neg_utc", 

888 "_tagger", 

889 ) 

890 

891 _message: bytes | None 

892 _name: bytes | None 

893 _object_class: "type[ShaFile] | None" 

894 _object_sha: bytes | None 

895 _signature: bytes | None 

896 _tag_time: int | None 

897 _tag_timezone: int | None 

898 _tag_timezone_neg_utc: bool | None 

899 _tagger: bytes | None 

900 

901 def __init__(self) -> None: 

902 """Initialize a new Tag object.""" 

903 super().__init__() 

904 self._tagger = None 

905 self._tag_time = None 

906 self._tag_timezone = None 

907 self._tag_timezone_neg_utc = False 

908 self._signature: bytes | None = None 

909 

910 @classmethod 

911 def from_path(cls, filename: str | bytes) -> "Tag": 

912 """Read a tag from a file on disk. 

913 

914 Args: 

915 filename: Path to the tag file 

916 

917 Returns: 

918 A Tag object 

919 

920 Raises: 

921 NotTagError: If the file is not a tag 

922 """ 

923 tag = ShaFile.from_path(filename) 

924 if not isinstance(tag, cls): 

925 raise NotTagError(_path_to_bytes(filename)) 

926 return tag 

927 

928 def check(self) -> None: 

929 """Check this object for internal consistency. 

930 

931 Raises: 

932 ObjectFormatException: if the object is malformed in some way 

933 """ 

934 super().check() 

935 assert self._chunked_text is not None 

936 self._check_has_member("_object_sha", "missing object sha") 

937 self._check_has_member("_object_class", "missing object type") 

938 self._check_has_member("_name", "missing tag name") 

939 

940 if not self._name: 

941 raise ObjectFormatException("empty tag name") 

942 

943 if self._object_sha is None: 

944 raise ObjectFormatException("missing object sha") 

945 check_hexsha(self._object_sha, "invalid object sha") 

946 

947 if self._tagger is not None: 

948 check_identity(self._tagger, "invalid tagger") 

949 

950 self._check_has_member("_tag_time", "missing tag time") 

951 if self._tag_time is None: 

952 raise ObjectFormatException("missing tag time") 

953 check_time(self._tag_time) 

954 

955 last = None 

956 for field, _ in _parse_message(self._chunked_text): 

957 if field == _OBJECT_HEADER and last is not None: 

958 raise ObjectFormatException("unexpected object") 

959 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

960 raise ObjectFormatException("unexpected type") 

961 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

962 raise ObjectFormatException("unexpected tag name") 

963 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

964 raise ObjectFormatException("unexpected tagger") 

965 last = field 

966 

967 def _serialize(self) -> list[bytes]: 

968 headers = [] 

969 if self._object_sha is None: 

970 raise ObjectFormatException("missing object sha") 

971 headers.append((_OBJECT_HEADER, self._object_sha)) 

972 if self._object_class is None: 

973 raise ObjectFormatException("missing object class") 

974 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

975 if self._name is None: 

976 raise ObjectFormatException("missing tag name") 

977 headers.append((_TAG_HEADER, self._name)) 

978 if self._tagger: 

979 if self._tag_time is None: 

980 headers.append((_TAGGER_HEADER, self._tagger)) 

981 else: 

982 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

983 raise ObjectFormatException("missing timezone info") 

984 headers.append( 

985 ( 

986 _TAGGER_HEADER, 

987 format_time_entry( 

988 self._tagger, 

989 self._tag_time, 

990 (self._tag_timezone, self._tag_timezone_neg_utc), 

991 ), 

992 ) 

993 ) 

994 

995 if self.message is None and self._signature is None: 

996 body = None 

997 else: 

998 body = (self.message or b"") + (self._signature or b"") 

999 return list(_format_message(headers, body)) 

1000 

1001 def _deserialize(self, chunks: list[bytes]) -> None: 

1002 """Grab the metadata attached to the tag.""" 

1003 self._tagger = None 

1004 self._tag_time = None 

1005 self._tag_timezone = None 

1006 self._tag_timezone_neg_utc = False 

1007 for field, value in _parse_message(chunks): 

1008 if field == _OBJECT_HEADER: 

1009 self._object_sha = value 

1010 elif field == _TYPE_HEADER: 

1011 assert isinstance(value, bytes) 

1012 obj_class = object_class(value) 

1013 if not obj_class: 

1014 raise ObjectFormatException(f"Not a known type: {value!r}") 

1015 self._object_class = obj_class 

1016 elif field == _TAG_HEADER: 

1017 self._name = value 

1018 elif field == _TAGGER_HEADER: 

1019 if value is None: 

1020 raise ObjectFormatException("missing tagger value") 

1021 ( 

1022 self._tagger, 

1023 self._tag_time, 

1024 (self._tag_timezone, self._tag_timezone_neg_utc), 

1025 ) = parse_time_entry(value) 

1026 elif field is None: 

1027 if value is None: 

1028 self._message = None 

1029 self._signature = None 

1030 else: 

1031 # Try to find either PGP or SSH signature 

1032 sig_idx = None 

1033 try: 

1034 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

1035 except ValueError: 

1036 try: 

1037 sig_idx = value.index(BEGIN_SSH_SIGNATURE) 

1038 except ValueError: 

1039 pass 

1040 

1041 if sig_idx is not None: 

1042 self._message = value[:sig_idx] 

1043 self._signature = value[sig_idx:] 

1044 else: 

1045 self._message = value 

1046 self._signature = None 

1047 else: 

1048 raise ObjectFormatException( 

1049 f"Unknown field {field.decode('ascii', 'replace')}" 

1050 ) 

1051 

1052 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

1053 """Get the object pointed to by this tag. 

1054 

1055 Returns: tuple of (object class, sha). 

1056 """ 

1057 if self._object_class is None or self._object_sha is None: 

1058 raise ValueError("Tag object is not properly initialized") 

1059 return (self._object_class, self._object_sha) 

1060 

1061 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

1062 (self._object_class, self._object_sha) = value 

1063 self._needs_serialization = True 

1064 

1065 object = property(_get_object, _set_object) 

1066 

1067 name = serializable_property("name", "The name of this tag") 

1068 tagger = serializable_property( 

1069 "tagger", "Returns the name of the person who created this tag" 

1070 ) 

1071 tag_time = serializable_property( 

1072 "tag_time", 

1073 "The creation timestamp of the tag. As the number of seconds since the epoch", 

1074 ) 

1075 tag_timezone = serializable_property( 

1076 "tag_timezone", "The timezone that tag_time is in." 

1077 ) 

1078 message = serializable_property("message", "the message attached to this tag") 

1079 

1080 signature = serializable_property("signature", "Optional detached GPG signature") 

1081 

1082 def sign(self, keyid: str | None = None) -> None: 

1083 """Sign this tag with a GPG key. 

1084 

1085 Args: 

1086 keyid: Optional GPG key ID to use for signing. If not specified, 

1087 the default GPG key will be used. 

1088 """ 

1089 import gpg 

1090 

1091 with gpg.Context(armor=True) as c: 

1092 if keyid is not None: 

1093 key = c.get_key(keyid) 

1094 with gpg.Context(armor=True, signers=[key]) as ctx: 

1095 self.signature, _unused_result = ctx.sign( 

1096 self.as_raw_string(), 

1097 mode=gpg.constants.sig.mode.DETACH, 

1098 ) 

1099 else: 

1100 self.signature, _unused_result = c.sign( 

1101 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1102 ) 

1103 

1104 def raw_without_sig(self) -> bytes: 

1105 """Return raw string serialization without the GPG/SSH signature. 

1106 

1107 self.signature is a signature for the returned raw byte string serialization. 

1108 """ 

1109 ret = self.as_raw_string() 

1110 if self._signature: 

1111 ret = ret[: -len(self._signature)] 

1112 return ret 

1113 

1114 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

1115 """Extract the payload, signature, and signature type from this tag. 

1116 

1117 Returns: 

1118 Tuple of (``payload``, ``signature``, ``signature_type``) where: 

1119 

1120 - ``payload``: The raw tag data without the signature 

1121 - ``signature``: The signature bytes if present, None otherwise 

1122 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1123 

1124 Raises: 

1125 ObjectFormatException: If signature has unknown format 

1126 """ 

1127 if self._signature is None: 

1128 return self.as_raw_string(), None, None 

1129 

1130 payload = self.raw_without_sig() 

1131 

1132 # Determine signature type 

1133 if self._signature.startswith(BEGIN_PGP_SIGNATURE): 

1134 sig_type = SIGNATURE_PGP 

1135 elif self._signature.startswith(BEGIN_SSH_SIGNATURE): 

1136 sig_type = SIGNATURE_SSH 

1137 else: 

1138 raise ObjectFormatException("Unknown signature format") 

1139 

1140 return payload, self._signature, sig_type 

1141 

1142 def verify(self, keyids: Iterable[str] | None = None) -> None: 

1143 """Verify GPG signature for this tag (if it is signed). 

1144 

1145 Args: 

1146 keyids: Optional iterable of trusted keyids for this tag. 

1147 If this tag is not signed by any key in keyids verification will 

1148 fail. If not specified, this function only verifies that the tag 

1149 has a valid signature. 

1150 

1151 Raises: 

1152 gpg.errors.BadSignatures: if GPG signature verification fails 

1153 gpg.errors.MissingSignatures: if tag was not signed by a key 

1154 specified in keyids 

1155 """ 

1156 if self._signature is None: 

1157 return 

1158 

1159 import gpg 

1160 

1161 with gpg.Context() as ctx: 

1162 data, result = ctx.verify( 

1163 self.raw_without_sig(), 

1164 signature=self._signature, 

1165 ) 

1166 if keyids: 

1167 keys = [ctx.get_key(key) for key in keyids] 

1168 for key in keys: 

1169 for subkey in key.subkeys: 

1170 for sig in result.signatures: 

1171 if subkey.can_sign and subkey.fpr == sig.fpr: 

1172 return 

1173 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1174 

1175 

1176class TreeEntry(NamedTuple): 

1177 """Named tuple encapsulating a single tree entry.""" 

1178 

1179 path: bytes 

1180 mode: int 

1181 sha: bytes 

1182 

1183 def in_path(self, path: bytes) -> "TreeEntry": 

1184 """Return a copy of this entry with the given path prepended.""" 

1185 if not isinstance(self.path, bytes): 

1186 raise TypeError(f"Expected bytes for path, got {path!r}") 

1187 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1188 

1189 

1190def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]: 

1191 """Parse a tree text. 

1192 

1193 Args: 

1194 text: Serialized text to parse 

1195 strict: If True, enforce strict validation 

1196 Returns: iterator of tuples of (name, mode, sha) 

1197 

1198 Raises: 

1199 ObjectFormatException: if the object was malformed in some way 

1200 """ 

1201 count = 0 

1202 length = len(text) 

1203 while count < length: 

1204 mode_end = text.index(b" ", count) 

1205 mode_text = text[count:mode_end] 

1206 if strict and mode_text.startswith(b"0"): 

1207 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1208 try: 

1209 mode = int(mode_text, 8) 

1210 except ValueError as exc: 

1211 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1212 name_end = text.index(b"\0", mode_end) 

1213 name = text[mode_end + 1 : name_end] 

1214 count = name_end + 21 

1215 sha = text[name_end + 1 : count] 

1216 if len(sha) != 20: 

1217 raise ObjectFormatException("Sha has invalid length") 

1218 hexsha = sha_to_hex(sha) 

1219 yield (name, mode, hexsha) 

1220 

1221 

1222def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]: 

1223 """Serialize the items in a tree to a text. 

1224 

1225 Args: 

1226 items: Sorted iterable over (name, mode, sha) tuples 

1227 Returns: Serialized tree text as chunks 

1228 """ 

1229 for name, mode, hexsha in items: 

1230 yield ( 

1231 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1232 ) 

1233 

1234 

1235def sorted_tree_items( 

1236 entries: dict[bytes, tuple[int, bytes]], name_order: bool 

1237) -> Iterator[TreeEntry]: 

1238 """Iterate over a tree entries dictionary. 

1239 

1240 Args: 

1241 name_order: If True, iterate entries in order of their name. If 

1242 False, iterate entries in tree order, that is, treat subtree entries as 

1243 having '/' appended. 

1244 entries: Dictionary mapping names to (mode, sha) tuples 

1245 Returns: Iterator over (name, mode, hexsha) 

1246 """ 

1247 if name_order: 

1248 key_func = key_entry_name_order 

1249 else: 

1250 key_func = key_entry 

1251 for name, entry in sorted(entries.items(), key=key_func): 

1252 mode, hexsha = entry 

1253 # Stricter type checks than normal to mirror checks in the Rust version. 

1254 mode = int(mode) 

1255 if not isinstance(hexsha, bytes): 

1256 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1257 yield TreeEntry(name, mode, hexsha) 

1258 

1259 

1260def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1261 """Sort key for tree entry. 

1262 

1263 Args: 

1264 entry: (name, value) tuple 

1265 """ 

1266 (name, (mode, _sha)) = entry 

1267 if stat.S_ISDIR(mode): 

1268 name += b"/" 

1269 return name 

1270 

1271 

1272def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1273 """Sort key for tree entry in name order.""" 

1274 return entry[0] 

1275 

1276 

1277def pretty_format_tree_entry( 

1278 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8" 

1279) -> str: 

1280 """Pretty format tree entry. 

1281 

1282 Args: 

1283 name: Name of the directory entry 

1284 mode: Mode of entry 

1285 hexsha: Hexsha of the referenced object 

1286 encoding: Character encoding for the name 

1287 Returns: string describing the tree entry 

1288 """ 

1289 if mode & stat.S_IFDIR: 

1290 kind = "tree" 

1291 else: 

1292 kind = "blob" 

1293 return "{:04o} {} {}\t{}\n".format( 

1294 mode, 

1295 kind, 

1296 hexsha.decode("ascii"), 

1297 name.decode(encoding, "replace"), 

1298 ) 

1299 

1300 

1301class SubmoduleEncountered(Exception): 

1302 """A submodule was encountered while resolving a path.""" 

1303 

1304 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1305 """Initialize SubmoduleEncountered exception. 

1306 

1307 Args: 

1308 path: Path where the submodule was encountered 

1309 sha: SHA of the submodule 

1310 """ 

1311 self.path = path 

1312 self.sha = sha 

1313 

1314 

1315class Tree(ShaFile): 

1316 """A Git tree object.""" 

1317 

1318 type_name = b"tree" 

1319 type_num = 2 

1320 

1321 __slots__ = "_entries" 

1322 

1323 def __init__(self) -> None: 

1324 """Initialize an empty Tree.""" 

1325 super().__init__() 

1326 self._entries: dict[bytes, tuple[int, bytes]] = {} 

1327 

1328 @classmethod 

1329 def from_path(cls, filename: str | bytes) -> "Tree": 

1330 """Read a tree from a file on disk. 

1331 

1332 Args: 

1333 filename: Path to the tree file 

1334 

1335 Returns: 

1336 A Tree object 

1337 

1338 Raises: 

1339 NotTreeError: If the file is not a tree 

1340 """ 

1341 tree = ShaFile.from_path(filename) 

1342 if not isinstance(tree, cls): 

1343 raise NotTreeError(_path_to_bytes(filename)) 

1344 return tree 

1345 

1346 def __contains__(self, name: bytes) -> bool: 

1347 """Check if name exists in tree.""" 

1348 return name in self._entries 

1349 

1350 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1351 """Get tree entry by name.""" 

1352 return self._entries[name] 

1353 

1354 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1355 """Set a tree entry by name. 

1356 

1357 Args: 

1358 name: The name of the entry, as a string. 

1359 value: A tuple of (mode, hexsha), where mode is the mode of the 

1360 entry as an integral type and hexsha is the hex SHA of the entry as 

1361 a string. 

1362 """ 

1363 mode, hexsha = value 

1364 self._entries[name] = (mode, hexsha) 

1365 self._needs_serialization = True 

1366 

1367 def __delitem__(self, name: bytes) -> None: 

1368 """Delete tree entry by name.""" 

1369 del self._entries[name] 

1370 self._needs_serialization = True 

1371 

1372 def __len__(self) -> int: 

1373 """Return number of entries in tree.""" 

1374 return len(self._entries) 

1375 

1376 def __iter__(self) -> Iterator[bytes]: 

1377 """Iterate over tree entry names.""" 

1378 return iter(self._entries) 

1379 

1380 def add(self, name: bytes, mode: int, hexsha: bytes) -> None: 

1381 """Add an entry to the tree. 

1382 

1383 Args: 

1384 mode: The mode of the entry as an integral type. Not all 

1385 possible modes are supported by git; see check() for details. 

1386 name: The name of the entry, as a string. 

1387 hexsha: The hex SHA of the entry as a string. 

1388 """ 

1389 self._entries[name] = mode, hexsha 

1390 self._needs_serialization = True 

1391 

1392 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1393 """Iterate over entries. 

1394 

1395 Args: 

1396 name_order: If True, iterate in name order instead of tree 

1397 order. 

1398 Returns: Iterator over (name, mode, sha) tuples 

1399 """ 

1400 return sorted_tree_items(self._entries, name_order) 

1401 

1402 def items(self) -> list[TreeEntry]: 

1403 """Return the sorted entries in this tree. 

1404 

1405 Returns: List with (name, mode, sha) tuples 

1406 """ 

1407 return list(self.iteritems()) 

1408 

1409 def _deserialize(self, chunks: list[bytes]) -> None: 

1410 """Grab the entries in the tree.""" 

1411 try: 

1412 parsed_entries = parse_tree(b"".join(chunks)) 

1413 except ValueError as exc: 

1414 raise ObjectFormatException(exc) from exc 

1415 # TODO: list comprehension is for efficiency in the common (small) 

1416 # case; if memory efficiency in the large case is a concern, use a 

1417 # genexp. 

1418 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1419 

1420 def check(self) -> None: 

1421 """Check this object for internal consistency. 

1422 

1423 Raises: 

1424 ObjectFormatException: if the object is malformed in some way 

1425 """ 

1426 super().check() 

1427 assert self._chunked_text is not None 

1428 last = None 

1429 allowed_modes = ( 

1430 stat.S_IFREG | 0o755, 

1431 stat.S_IFREG | 0o644, 

1432 stat.S_IFLNK, 

1433 stat.S_IFDIR, 

1434 S_IFGITLINK, 

1435 # TODO: optionally exclude as in git fsck --strict 

1436 stat.S_IFREG | 0o664, 

1437 ) 

1438 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1439 check_hexsha(sha, f"invalid sha {sha!r}") 

1440 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1441 raise ObjectFormatException( 

1442 "invalid name {}".format(name.decode("utf-8", "replace")) 

1443 ) 

1444 

1445 if mode not in allowed_modes: 

1446 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1447 

1448 entry = (name, (mode, sha)) 

1449 if last: 

1450 if key_entry(last) > key_entry(entry): 

1451 raise ObjectFormatException("entries not sorted") 

1452 if name == last[0]: 

1453 raise ObjectFormatException(f"duplicate entry {name!r}") 

1454 last = entry 

1455 

1456 def _serialize(self) -> list[bytes]: 

1457 return list(serialize_tree(self.iteritems())) 

1458 

1459 def as_pretty_string(self) -> str: 

1460 """Return a human-readable string representation of this tree. 

1461 

1462 Returns: 

1463 Pretty-printed tree entries 

1464 """ 

1465 text: list[str] = [] 

1466 for entry in self.iteritems(): 

1467 if ( 

1468 entry.path is not None 

1469 and entry.mode is not None 

1470 and entry.sha is not None 

1471 ): 

1472 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha)) 

1473 return "".join(text) 

1474 

1475 def lookup_path( 

1476 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1477 ) -> tuple[int, ObjectID]: 

1478 """Look up an object in a Git tree. 

1479 

1480 Args: 

1481 lookup_obj: Callback for retrieving object by SHA1 

1482 path: Path to lookup 

1483 Returns: A tuple of (mode, SHA) of the resulting path. 

1484 """ 

1485 # Handle empty path - return the tree itself 

1486 if not path: 

1487 return stat.S_IFDIR, self.id 

1488 

1489 parts = path.split(b"/") 

1490 sha = self.id 

1491 mode: int | None = None 

1492 for i, p in enumerate(parts): 

1493 if not p: 

1494 continue 

1495 if mode is not None and S_ISGITLINK(mode): 

1496 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1497 obj = lookup_obj(sha) 

1498 if not isinstance(obj, Tree): 

1499 raise NotTreeError(sha) 

1500 mode, sha = obj[p] 

1501 if mode is None: 

1502 raise ValueError("No valid path found") 

1503 return mode, sha 

1504 

1505 

1506def parse_timezone(text: bytes) -> tuple[int, bool]: 

1507 """Parse a timezone text fragment (e.g. '+0100'). 

1508 

1509 Args: 

1510 text: Text to parse. 

1511 Returns: Tuple with timezone as seconds difference to UTC 

1512 and a boolean indicating whether this was a UTC timezone 

1513 prefixed with a negative sign (-0000). 

1514 """ 

1515 # cgit parses the first character as the sign, and the rest 

1516 # as an integer (using strtol), which could also be negative. 

1517 # We do the same for compatibility. See #697828. 

1518 if text[0] not in b"+-": 

1519 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1520 sign = text[:1] 

1521 offset = int(text[1:]) 

1522 if sign == b"-": 

1523 offset = -offset 

1524 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1525 signum = ((offset < 0) and -1) or 1 

1526 offset = abs(offset) 

1527 hours = int(offset / 100) 

1528 minutes = offset % 100 

1529 return ( 

1530 signum * (hours * 3600 + minutes * 60), 

1531 unnecessary_negative_timezone, 

1532 ) 

1533 

1534 

1535def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1536 """Format a timezone for Git serialization. 

1537 

1538 Args: 

1539 offset: Timezone offset as seconds difference to UTC 

1540 unnecessary_negative_timezone: Whether to use a minus sign for 

1541 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1542 """ 

1543 if offset % 60 != 0: 

1544 raise ValueError("Unable to handle non-minute offset.") 

1545 if offset < 0 or unnecessary_negative_timezone: 

1546 sign = "-" 

1547 offset = -offset 

1548 else: 

1549 sign = "+" 

1550 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1551 

1552 

1553def parse_time_entry( 

1554 value: bytes, 

1555) -> tuple[bytes, int | None, tuple[int | None, bool]]: 

1556 """Parse event. 

1557 

1558 Args: 

1559 value: Bytes representing a git commit/tag line 

1560 Raises: 

1561 ObjectFormatException in case of parsing error (malformed 

1562 field date) 

1563 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1564 """ 

1565 try: 

1566 sep = value.rindex(b"> ") 

1567 except ValueError: 

1568 return (value, None, (None, False)) 

1569 try: 

1570 person = value[0 : sep + 1] 

1571 rest = value[sep + 2 :] 

1572 timetext, timezonetext = rest.rsplit(b" ", 1) 

1573 time = int(timetext) 

1574 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1575 except ValueError as exc: 

1576 raise ObjectFormatException(exc) from exc 

1577 return person, time, (timezone, timezone_neg_utc) 

1578 

1579 

1580def format_time_entry( 

1581 person: bytes, time: int, timezone_info: tuple[int, bool] 

1582) -> bytes: 

1583 """Format an event.""" 

1584 (timezone, timezone_neg_utc) = timezone_info 

1585 return b" ".join( 

1586 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1587 ) 

1588 

1589 

1590@replace_me(since="0.21.0", remove_in="0.24.0") 

1591def parse_commit( 

1592 chunks: Iterable[bytes], 

1593) -> tuple[ 

1594 bytes | None, 

1595 list[bytes], 

1596 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1597 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1598 bytes | None, 

1599 list[Tag], 

1600 bytes | None, 

1601 bytes | None, 

1602 list[tuple[bytes, bytes]], 

1603]: 

1604 """Parse a commit object from chunks. 

1605 

1606 Args: 

1607 chunks: Chunks to parse 

1608 Returns: Tuple of (tree, parents, author_info, commit_info, 

1609 encoding, mergetag, gpgsig, message, extra) 

1610 """ 

1611 parents = [] 

1612 extra = [] 

1613 tree = None 

1614 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1615 None, 

1616 None, 

1617 (None, None), 

1618 ) 

1619 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1620 None, 

1621 None, 

1622 (None, None), 

1623 ) 

1624 encoding = None 

1625 mergetag = [] 

1626 message = None 

1627 gpgsig = None 

1628 

1629 for field, value in _parse_message(chunks): 

1630 # TODO(jelmer): Enforce ordering 

1631 if field == _TREE_HEADER: 

1632 tree = value 

1633 elif field == _PARENT_HEADER: 

1634 if value is None: 

1635 raise ObjectFormatException("missing parent value") 

1636 parents.append(value) 

1637 elif field == _AUTHOR_HEADER: 

1638 if value is None: 

1639 raise ObjectFormatException("missing author value") 

1640 author_info = parse_time_entry(value) 

1641 elif field == _COMMITTER_HEADER: 

1642 if value is None: 

1643 raise ObjectFormatException("missing committer value") 

1644 commit_info = parse_time_entry(value) 

1645 elif field == _ENCODING_HEADER: 

1646 encoding = value 

1647 elif field == _MERGETAG_HEADER: 

1648 if value is None: 

1649 raise ObjectFormatException("missing mergetag value") 

1650 tag = Tag.from_string(value + b"\n") 

1651 assert isinstance(tag, Tag) 

1652 mergetag.append(tag) 

1653 elif field == _GPGSIG_HEADER: 

1654 gpgsig = value 

1655 elif field is None: 

1656 message = value 

1657 else: 

1658 if value is None: 

1659 raise ObjectFormatException(f"missing value for field {field!r}") 

1660 extra.append((field, value)) 

1661 return ( 

1662 tree, 

1663 parents, 

1664 author_info, 

1665 commit_info, 

1666 encoding, 

1667 mergetag, 

1668 gpgsig, 

1669 message, 

1670 extra, 

1671 ) 

1672 

1673 

1674class Commit(ShaFile): 

1675 """A git commit object.""" 

1676 

1677 type_name = b"commit" 

1678 type_num = 1 

1679 

1680 __slots__ = ( 

1681 "_author", 

1682 "_author_time", 

1683 "_author_timezone", 

1684 "_author_timezone_neg_utc", 

1685 "_commit_time", 

1686 "_commit_timezone", 

1687 "_commit_timezone_neg_utc", 

1688 "_committer", 

1689 "_encoding", 

1690 "_extra", 

1691 "_gpgsig", 

1692 "_mergetag", 

1693 "_message", 

1694 "_parents", 

1695 "_tree", 

1696 ) 

1697 

1698 def __init__(self) -> None: 

1699 """Initialize an empty Commit.""" 

1700 super().__init__() 

1701 self._parents: list[bytes] = [] 

1702 self._encoding: bytes | None = None 

1703 self._mergetag: list[Tag] = [] 

1704 self._gpgsig: bytes | None = None 

1705 self._extra: list[tuple[bytes, bytes | None]] = [] 

1706 self._author_timezone_neg_utc: bool | None = False 

1707 self._commit_timezone_neg_utc: bool | None = False 

1708 

1709 @classmethod 

1710 def from_path(cls, path: str | bytes) -> "Commit": 

1711 """Read a commit from a file on disk. 

1712 

1713 Args: 

1714 path: Path to the commit file 

1715 

1716 Returns: 

1717 A Commit object 

1718 

1719 Raises: 

1720 NotCommitError: If the file is not a commit 

1721 """ 

1722 commit = ShaFile.from_path(path) 

1723 if not isinstance(commit, cls): 

1724 raise NotCommitError(_path_to_bytes(path)) 

1725 return commit 

1726 

1727 def _deserialize(self, chunks: list[bytes]) -> None: 

1728 self._parents = [] 

1729 self._extra = [] 

1730 self._tree = None 

1731 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1732 None, 

1733 None, 

1734 (None, None), 

1735 ) 

1736 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1737 None, 

1738 None, 

1739 (None, None), 

1740 ) 

1741 self._encoding = None 

1742 self._mergetag = [] 

1743 self._message = None 

1744 self._gpgsig = None 

1745 

1746 for field, value in _parse_message(chunks): 

1747 # TODO(jelmer): Enforce ordering 

1748 if field == _TREE_HEADER: 

1749 self._tree = value 

1750 elif field == _PARENT_HEADER: 

1751 assert value is not None 

1752 self._parents.append(value) 

1753 elif field == _AUTHOR_HEADER: 

1754 if value is None: 

1755 raise ObjectFormatException("missing author value") 

1756 author_info = parse_time_entry(value) 

1757 elif field == _COMMITTER_HEADER: 

1758 if value is None: 

1759 raise ObjectFormatException("missing committer value") 

1760 commit_info = parse_time_entry(value) 

1761 elif field == _ENCODING_HEADER: 

1762 self._encoding = value 

1763 elif field == _MERGETAG_HEADER: 

1764 assert value is not None 

1765 tag = Tag.from_string(value + b"\n") 

1766 assert isinstance(tag, Tag) 

1767 self._mergetag.append(tag) 

1768 elif field == _GPGSIG_HEADER: 

1769 self._gpgsig = value 

1770 elif field is None: 

1771 self._message = value 

1772 else: 

1773 self._extra.append((field, value)) 

1774 

1775 ( 

1776 self._author, 

1777 self._author_time, 

1778 (self._author_timezone, self._author_timezone_neg_utc), 

1779 ) = author_info 

1780 ( 

1781 self._committer, 

1782 self._commit_time, 

1783 (self._commit_timezone, self._commit_timezone_neg_utc), 

1784 ) = commit_info 

1785 

1786 def check(self) -> None: 

1787 """Check this object for internal consistency. 

1788 

1789 Raises: 

1790 ObjectFormatException: if the object is malformed in some way 

1791 """ 

1792 super().check() 

1793 assert self._chunked_text is not None 

1794 self._check_has_member("_tree", "missing tree") 

1795 self._check_has_member("_author", "missing author") 

1796 self._check_has_member("_committer", "missing committer") 

1797 self._check_has_member("_author_time", "missing author time") 

1798 self._check_has_member("_commit_time", "missing commit time") 

1799 

1800 for parent in self._parents: 

1801 check_hexsha(parent, "invalid parent sha") 

1802 assert self._tree is not None # checked by _check_has_member above 

1803 check_hexsha(self._tree, "invalid tree sha") 

1804 

1805 assert self._author is not None # checked by _check_has_member above 

1806 assert self._committer is not None # checked by _check_has_member above 

1807 check_identity(self._author, "invalid author") 

1808 check_identity(self._committer, "invalid committer") 

1809 

1810 assert self._author_time is not None # checked by _check_has_member above 

1811 assert self._commit_time is not None # checked by _check_has_member above 

1812 check_time(self._author_time) 

1813 check_time(self._commit_time) 

1814 

1815 last = None 

1816 for field, _ in _parse_message(self._chunked_text): 

1817 if field == _TREE_HEADER and last is not None: 

1818 raise ObjectFormatException("unexpected tree") 

1819 elif field == _PARENT_HEADER and last not in ( 

1820 _PARENT_HEADER, 

1821 _TREE_HEADER, 

1822 ): 

1823 raise ObjectFormatException("unexpected parent") 

1824 elif field == _AUTHOR_HEADER and last not in ( 

1825 _TREE_HEADER, 

1826 _PARENT_HEADER, 

1827 ): 

1828 raise ObjectFormatException("unexpected author") 

1829 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1830 raise ObjectFormatException("unexpected committer") 

1831 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1832 raise ObjectFormatException("unexpected encoding") 

1833 last = field 

1834 

1835 # TODO: optionally check for duplicate parents 

1836 

1837 def sign(self, keyid: str | None = None) -> None: 

1838 """Sign this commit with a GPG key. 

1839 

1840 Args: 

1841 keyid: Optional GPG key ID to use for signing. If not specified, 

1842 the default GPG key will be used. 

1843 """ 

1844 import gpg 

1845 

1846 with gpg.Context(armor=True) as c: 

1847 if keyid is not None: 

1848 key = c.get_key(keyid) 

1849 with gpg.Context(armor=True, signers=[key]) as ctx: 

1850 self.gpgsig, _unused_result = ctx.sign( 

1851 self.as_raw_string(), 

1852 mode=gpg.constants.sig.mode.DETACH, 

1853 ) 

1854 else: 

1855 self.gpgsig, _unused_result = c.sign( 

1856 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1857 ) 

1858 

1859 def raw_without_sig(self) -> bytes: 

1860 """Return raw string serialization without the GPG/SSH signature. 

1861 

1862 self.gpgsig is a signature for the returned raw byte string serialization. 

1863 """ 

1864 tmp = self.copy() 

1865 assert isinstance(tmp, Commit) 

1866 tmp._gpgsig = None 

1867 tmp.gpgsig = None 

1868 return tmp.as_raw_string() 

1869 

1870 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

1871 """Extract the payload, signature, and signature type from this commit. 

1872 

1873 Returns: 

1874 Tuple of (``payload``, ``signature``, ``signature_type``) where: 

1875 

1876 - ``payload``: The raw commit data without the signature 

1877 - ``signature``: The signature bytes if present, None otherwise 

1878 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1879 

1880 Raises: 

1881 ObjectFormatException: If signature has unknown format 

1882 """ 

1883 if self._gpgsig is None: 

1884 return self.as_raw_string(), None, None 

1885 

1886 payload = self.raw_without_sig() 

1887 

1888 # Determine signature type 

1889 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE): 

1890 sig_type = SIGNATURE_PGP 

1891 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE): 

1892 sig_type = SIGNATURE_SSH 

1893 else: 

1894 raise ObjectFormatException("Unknown signature format") 

1895 

1896 return payload, self._gpgsig, sig_type 

1897 

1898 def verify(self, keyids: Iterable[str] | None = None) -> None: 

1899 """Verify GPG signature for this commit (if it is signed). 

1900 

1901 Args: 

1902 keyids: Optional iterable of trusted keyids for this commit. 

1903 If this commit is not signed by any key in keyids verification will 

1904 fail. If not specified, this function only verifies that the commit 

1905 has a valid signature. 

1906 

1907 Raises: 

1908 gpg.errors.BadSignatures: if GPG signature verification fails 

1909 gpg.errors.MissingSignatures: if commit was not signed by a key 

1910 specified in keyids 

1911 """ 

1912 if self._gpgsig is None: 

1913 return 

1914 

1915 import gpg 

1916 

1917 with gpg.Context() as ctx: 

1918 data, result = ctx.verify( 

1919 self.raw_without_sig(), 

1920 signature=self._gpgsig, 

1921 ) 

1922 if keyids: 

1923 keys = [ctx.get_key(key) for key in keyids] 

1924 for key in keys: 

1925 for subkey in key.subkeys: 

1926 for sig in result.signatures: 

1927 if subkey.can_sign and subkey.fpr == sig.fpr: 

1928 return 

1929 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1930 

1931 def _serialize(self) -> list[bytes]: 

1932 headers = [] 

1933 assert self._tree is not None 

1934 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1935 headers.append((_TREE_HEADER, tree_bytes)) 

1936 for p in self._parents: 

1937 headers.append((_PARENT_HEADER, p)) 

1938 assert self._author is not None 

1939 assert self._author_time is not None 

1940 assert self._author_timezone is not None 

1941 assert self._author_timezone_neg_utc is not None 

1942 headers.append( 

1943 ( 

1944 _AUTHOR_HEADER, 

1945 format_time_entry( 

1946 self._author, 

1947 self._author_time, 

1948 (self._author_timezone, self._author_timezone_neg_utc), 

1949 ), 

1950 ) 

1951 ) 

1952 assert self._committer is not None 

1953 assert self._commit_time is not None 

1954 assert self._commit_timezone is not None 

1955 assert self._commit_timezone_neg_utc is not None 

1956 headers.append( 

1957 ( 

1958 _COMMITTER_HEADER, 

1959 format_time_entry( 

1960 self._committer, 

1961 self._commit_time, 

1962 (self._commit_timezone, self._commit_timezone_neg_utc), 

1963 ), 

1964 ) 

1965 ) 

1966 if self.encoding: 

1967 headers.append((_ENCODING_HEADER, self.encoding)) 

1968 for mergetag in self.mergetag: 

1969 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

1970 headers.extend( 

1971 (field, value) for field, value in self._extra if value is not None 

1972 ) 

1973 if self.gpgsig: 

1974 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

1975 return list(_format_message(headers, self._message)) 

1976 

1977 tree = serializable_property("tree", "Tree that is the state of this commit") 

1978 

1979 def _get_parents(self) -> list[bytes]: 

1980 """Return a list of parents of this commit.""" 

1981 return self._parents 

1982 

1983 def _set_parents(self, value: list[bytes]) -> None: 

1984 """Set a list of parents of this commit.""" 

1985 self._needs_serialization = True 

1986 self._parents = value 

1987 

1988 parents = property( 

1989 _get_parents, 

1990 _set_parents, 

1991 doc="Parents of this commit, by their SHA1.", 

1992 ) 

1993 

1994 @replace_me(since="0.21.0", remove_in="0.24.0") 

1995 def _get_extra(self) -> list[tuple[bytes, bytes | None]]: 

1996 """Return extra settings of this commit.""" 

1997 return self._extra 

1998 

1999 extra = property( 

2000 _get_extra, 

2001 doc="Extra header fields not understood (presumably added in a " 

2002 "newer version of git). Kept verbatim so the object can " 

2003 "be correctly reserialized. For private commit metadata, use " 

2004 "pseudo-headers in Commit.message, rather than this field.", 

2005 ) 

2006 

2007 author = serializable_property("author", "The name of the author of the commit") 

2008 

2009 committer = serializable_property( 

2010 "committer", "The name of the committer of the commit" 

2011 ) 

2012 

2013 message = serializable_property("message", "The commit message") 

2014 

2015 commit_time = serializable_property( 

2016 "commit_time", 

2017 "The timestamp of the commit. As the number of seconds since the epoch.", 

2018 ) 

2019 

2020 commit_timezone = serializable_property( 

2021 "commit_timezone", "The zone the commit time is in" 

2022 ) 

2023 

2024 author_time = serializable_property( 

2025 "author_time", 

2026 "The timestamp the commit was written. As the number of " 

2027 "seconds since the epoch.", 

2028 ) 

2029 

2030 author_timezone = serializable_property( 

2031 "author_timezone", "Returns the zone the author time is in." 

2032 ) 

2033 

2034 encoding = serializable_property("encoding", "Encoding of the commit message.") 

2035 

2036 mergetag = serializable_property("mergetag", "Associated signed tag.") 

2037 

2038 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

2039 

2040 

2041OBJECT_CLASSES = ( 

2042 Commit, 

2043 Tree, 

2044 Blob, 

2045 Tag, 

2046) 

2047 

2048_TYPE_MAP: dict[bytes | int, type[ShaFile]] = {} 

2049 

2050for cls in OBJECT_CLASSES: 

2051 _TYPE_MAP[cls.type_name] = cls 

2052 _TYPE_MAP[cls.type_num] = cls 

2053 

2054 

2055# Hold on to the pure-python implementations for testing 

2056_parse_tree_py = parse_tree 

2057_sorted_tree_items_py = sorted_tree_items 

2058try: 

2059 # Try to import Rust versions 

2060 from dulwich._objects import ( 

2061 parse_tree as _parse_tree_rs, 

2062 ) 

2063 from dulwich._objects import ( 

2064 sorted_tree_items as _sorted_tree_items_rs, 

2065 ) 

2066except ImportError: 

2067 pass 

2068else: 

2069 parse_tree = _parse_tree_rs 

2070 sorted_tree_items = _sorted_tree_items_rs