Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1020 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25import binascii 

26import os 

27import posixpath 

28import stat 

29import sys 

30import zlib 

31from collections.abc import Callable, Iterable, Iterator, Sequence 

32from hashlib import sha1 

33from io import BufferedIOBase, BytesIO 

34from typing import ( 

35 IO, 

36 TYPE_CHECKING, 

37 NamedTuple, 

38 Optional, 

39 TypeVar, 

40 Union, 

41) 

42 

43if sys.version_info >= (3, 11): 

44 from typing import Self 

45else: 

46 from typing_extensions import Self 

47 

48if sys.version_info >= (3, 10): 

49 from typing import TypeGuard 

50else: 

51 from typing_extensions import TypeGuard 

52 

53from . import replace_me 

54from .errors import ( 

55 ChecksumMismatch, 

56 FileFormatException, 

57 NotBlobError, 

58 NotCommitError, 

59 NotTagError, 

60 NotTreeError, 

61 ObjectFormatException, 

62) 

63from .file import GitFile 

64 

65if TYPE_CHECKING: 

66 from _hashlib import HASH 

67 

68 from .file import _GitFile 

69 

70ZERO_SHA = b"0" * 40 

71 

72# Header fields for commits 

73_TREE_HEADER = b"tree" 

74_PARENT_HEADER = b"parent" 

75_AUTHOR_HEADER = b"author" 

76_COMMITTER_HEADER = b"committer" 

77_ENCODING_HEADER = b"encoding" 

78_MERGETAG_HEADER = b"mergetag" 

79_GPGSIG_HEADER = b"gpgsig" 

80 

81# Header fields for objects 

82_OBJECT_HEADER = b"object" 

83_TYPE_HEADER = b"type" 

84_TAG_HEADER = b"tag" 

85_TAGGER_HEADER = b"tagger" 

86 

87 

88S_IFGITLINK = 0o160000 

89 

90 

91MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

92 

93BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

94BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----" 

95 

96# Signature type constants 

97SIGNATURE_PGP = b"pgp" 

98SIGNATURE_SSH = b"ssh" 

99 

100 

101ObjectID = bytes 

102 

103 

104class EmptyFileException(FileFormatException): 

105 """An unexpectedly empty file was encountered.""" 

106 

107 

108def S_ISGITLINK(m: int) -> bool: 

109 """Check if a mode indicates a submodule. 

110 

111 Args: 

112 m: Mode to check 

113 Returns: a ``boolean`` 

114 """ 

115 return stat.S_IFMT(m) == S_IFGITLINK 

116 

117 

118def _decompress(string: bytes) -> bytes: 

119 dcomp = zlib.decompressobj() 

120 dcomped = dcomp.decompress(string) 

121 dcomped += dcomp.flush() 

122 return dcomped 

123 

124 

125def sha_to_hex(sha: ObjectID) -> bytes: 

126 """Takes a string and returns the hex of the sha within.""" 

127 hexsha = binascii.hexlify(sha) 

128 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

129 return hexsha 

130 

131 

132def hex_to_sha(hex: Union[bytes, str]) -> bytes: 

133 """Takes a hex sha and returns a binary sha.""" 

134 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}" 

135 try: 

136 return binascii.unhexlify(hex) 

137 except TypeError as exc: 

138 if not isinstance(hex, bytes): 

139 raise 

140 raise ValueError(exc.args[0]) from exc 

141 

142 

143def valid_hexsha(hex: Union[bytes, str]) -> bool: 

144 """Check if a string is a valid hex SHA. 

145 

146 Args: 

147 hex: Hex string to check 

148 

149 Returns: 

150 True if valid hex SHA, False otherwise 

151 """ 

152 if len(hex) != 40: 

153 return False 

154 try: 

155 binascii.unhexlify(hex) 

156 except (TypeError, binascii.Error): 

157 return False 

158 else: 

159 return True 

160 

161 

162PathT = TypeVar("PathT", str, bytes) 

163 

164 

165def hex_to_filename(path: PathT, hex: Union[str, bytes]) -> PathT: 

166 """Takes a hex sha and returns its filename relative to the given path.""" 

167 # os.path.join accepts bytes or unicode, but all args must be of the same 

168 # type. Make sure that hex which is expected to be bytes, is the same type 

169 # as path. 

170 if isinstance(path, str): 

171 if isinstance(hex, bytes): 

172 hex_str = hex.decode("ascii") 

173 else: 

174 hex_str = hex 

175 dir_name = hex_str[:2] 

176 file_name = hex_str[2:] 

177 result = os.path.join(path, dir_name, file_name) 

178 assert isinstance(result, str) 

179 return result 

180 else: 

181 # path is bytes 

182 if isinstance(hex, str): 

183 hex_bytes = hex.encode("ascii") 

184 else: 

185 hex_bytes = hex 

186 dir_name_b = hex_bytes[:2] 

187 file_name_b = hex_bytes[2:] 

188 result_b = os.path.join(path, dir_name_b, file_name_b) 

189 assert isinstance(result_b, bytes) 

190 return result_b 

191 

192 

193def filename_to_hex(filename: Union[str, bytes]) -> str: 

194 """Takes an object filename and returns its corresponding hex sha.""" 

195 # grab the last (up to) two path components 

196 errmsg = f"Invalid object filename: {filename!r}" 

197 if isinstance(filename, str): 

198 names = filename.rsplit(os.path.sep, 2)[-2:] 

199 assert len(names) == 2, errmsg 

200 base, rest = names 

201 assert len(base) == 2 and len(rest) == 38, errmsg 

202 hex_str = base + rest 

203 hex_bytes = hex_str.encode("ascii") 

204 else: 

205 # filename is bytes 

206 sep = ( 

207 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep 

208 ) 

209 names_b = filename.rsplit(sep, 2)[-2:] 

210 assert len(names_b) == 2, errmsg 

211 base_b, rest_b = names_b 

212 assert len(base_b) == 2 and len(rest_b) == 38, errmsg 

213 hex_bytes = base_b + rest_b 

214 hex_to_sha(hex_bytes) 

215 return hex_bytes.decode("ascii") 

216 

217 

218def object_header(num_type: int, length: int) -> bytes: 

219 """Return an object header for the given numeric type and text length.""" 

220 cls = object_class(num_type) 

221 if cls is None: 

222 raise AssertionError(f"unsupported class type num: {num_type}") 

223 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

224 

225 

226def serializable_property(name: str, docstring: Optional[str] = None) -> property: 

227 """A property that helps tracking whether serialization is necessary.""" 

228 

229 def set(obj: "ShaFile", value: object) -> None: 

230 """Set the property value and mark the object as needing serialization. 

231 

232 Args: 

233 obj: The ShaFile object 

234 value: The value to set 

235 """ 

236 setattr(obj, "_" + name, value) 

237 obj._needs_serialization = True 

238 

239 def get(obj: "ShaFile") -> object: 

240 """Get the property value. 

241 

242 Args: 

243 obj: The ShaFile object 

244 

245 Returns: 

246 The property value 

247 """ 

248 return getattr(obj, "_" + name) 

249 

250 return property(get, set, doc=docstring) 

251 

252 

253def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]: 

254 """Get the object class corresponding to the given type. 

255 

256 Args: 

257 type: Either a type name string or a numeric type. 

258 Returns: The ShaFile subclass corresponding to the given type, or None if 

259 type is not a valid type name/number. 

260 """ 

261 return _TYPE_MAP.get(type, None) 

262 

263 

264def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None: 

265 """Check if a string is a valid hex sha string. 

266 

267 Args: 

268 hex: Hex string to check 

269 error_msg: Error message to use in exception 

270 Raises: 

271 ObjectFormatException: Raised when the string is not valid 

272 """ 

273 if not valid_hexsha(hex): 

274 raise ObjectFormatException(f"{error_msg} {hex!r}") 

275 

276 

277def check_identity(identity: Optional[bytes], error_msg: str) -> None: 

278 """Check if the specified identity is valid. 

279 

280 This will raise an exception if the identity is not valid. 

281 

282 Args: 

283 identity: Identity string 

284 error_msg: Error message to use in exception 

285 """ 

286 if identity is None: 

287 raise ObjectFormatException(error_msg) 

288 email_start = identity.find(b"<") 

289 email_end = identity.find(b">") 

290 if not all( 

291 [ 

292 email_start >= 1, 

293 identity[email_start - 1] == b" "[0], 

294 identity.find(b"<", email_start + 1) == -1, 

295 email_end == len(identity) - 1, 

296 b"\0" not in identity, 

297 b"\n" not in identity, 

298 ] 

299 ): 

300 raise ObjectFormatException(error_msg) 

301 

302 

303def _path_to_bytes(path: Union[str, bytes]) -> bytes: 

304 """Convert a path to bytes for use in error messages.""" 

305 if isinstance(path, str): 

306 return path.encode("utf-8", "surrogateescape") 

307 return path 

308 

309 

310def check_time(time_seconds: int) -> None: 

311 """Check if the specified time is not prone to overflow error. 

312 

313 This will raise an exception if the time is not valid. 

314 

315 Args: 

316 time_seconds: time in seconds 

317 

318 """ 

319 # Prevent overflow error 

320 if time_seconds > MAX_TIME: 

321 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

322 

323 

324def git_line(*items: bytes) -> bytes: 

325 """Formats items into a space separated line.""" 

326 return b" ".join(items) + b"\n" 

327 

328 

329class FixedSha: 

330 """SHA object that behaves like hashlib's but is given a fixed value.""" 

331 

332 __slots__ = ("_hexsha", "_sha") 

333 

334 def __init__(self, hexsha: Union[str, bytes]) -> None: 

335 """Initialize FixedSha with a fixed SHA value. 

336 

337 Args: 

338 hexsha: Hex SHA value as string or bytes 

339 """ 

340 if isinstance(hexsha, str): 

341 hexsha = hexsha.encode("ascii") 

342 if not isinstance(hexsha, bytes): 

343 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

344 self._hexsha = hexsha 

345 self._sha = hex_to_sha(hexsha) 

346 

347 def digest(self) -> bytes: 

348 """Return the raw SHA digest.""" 

349 return self._sha 

350 

351 def hexdigest(self) -> str: 

352 """Return the hex SHA digest.""" 

353 return self._hexsha.decode("ascii") 

354 

355 

356# Type guard functions for runtime type narrowing 

357if TYPE_CHECKING: 

358 

359 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

360 """Check if a ShaFile is a Commit.""" 

361 return obj.type_name == b"commit" 

362 

363 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

364 """Check if a ShaFile is a Tree.""" 

365 return obj.type_name == b"tree" 

366 

367 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

368 """Check if a ShaFile is a Blob.""" 

369 return obj.type_name == b"blob" 

370 

371 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

372 """Check if a ShaFile is a Tag.""" 

373 return obj.type_name == b"tag" 

374else: 

375 # Runtime versions without type narrowing 

376 def is_commit(obj: "ShaFile") -> bool: 

377 """Check if a ShaFile is a Commit.""" 

378 return obj.type_name == b"commit" 

379 

380 def is_tree(obj: "ShaFile") -> bool: 

381 """Check if a ShaFile is a Tree.""" 

382 return obj.type_name == b"tree" 

383 

384 def is_blob(obj: "ShaFile") -> bool: 

385 """Check if a ShaFile is a Blob.""" 

386 return obj.type_name == b"blob" 

387 

388 def is_tag(obj: "ShaFile") -> bool: 

389 """Check if a ShaFile is a Tag.""" 

390 return obj.type_name == b"tag" 

391 

392 

393class ShaFile: 

394 """A git SHA file.""" 

395 

396 __slots__ = ("_chunked_text", "_needs_serialization", "_sha") 

397 

398 _needs_serialization: bool 

399 type_name: bytes 

400 type_num: int 

401 _chunked_text: Optional[list[bytes]] 

402 _sha: Union[FixedSha, None, "HASH"] 

403 

404 @staticmethod 

405 def _parse_legacy_object_header( 

406 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

407 ) -> "ShaFile": 

408 """Parse a legacy object, creating it but not reading the file.""" 

409 bufsize = 1024 

410 decomp = zlib.decompressobj() 

411 header = decomp.decompress(magic) 

412 start = 0 

413 end = -1 

414 while end < 0: 

415 extra = f.read(bufsize) 

416 header += decomp.decompress(extra) 

417 magic += extra 

418 end = header.find(b"\0", start) 

419 start = len(header) 

420 header = header[:end] 

421 type_name, size = header.split(b" ", 1) 

422 try: 

423 int(size) # sanity check 

424 except ValueError as exc: 

425 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

426 obj_class = object_class(type_name) 

427 if not obj_class: 

428 raise ObjectFormatException( 

429 "Not a known type: {}".format(type_name.decode("ascii")) 

430 ) 

431 return obj_class() 

432 

433 def _parse_legacy_object(self, map: bytes) -> None: 

434 """Parse a legacy object, setting the raw string.""" 

435 text = _decompress(map) 

436 header_end = text.find(b"\0") 

437 if header_end < 0: 

438 raise ObjectFormatException("Invalid object header, no \\0") 

439 self.set_raw_string(text[header_end + 1 :]) 

440 

441 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

442 """Return chunks representing the object in the experimental format. 

443 

444 Returns: List of strings 

445 """ 

446 compobj = zlib.compressobj(compression_level) 

447 yield compobj.compress(self._header()) 

448 for chunk in self.as_raw_chunks(): 

449 yield compobj.compress(chunk) 

450 yield compobj.flush() 

451 

452 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

453 """Return string representing the object in the experimental format.""" 

454 return b"".join( 

455 self.as_legacy_object_chunks(compression_level=compression_level) 

456 ) 

457 

458 def as_raw_chunks(self) -> list[bytes]: 

459 """Return chunks with serialization of the object. 

460 

461 Returns: List of strings, not necessarily one per line 

462 """ 

463 if self._needs_serialization: 

464 self._sha = None 

465 self._chunked_text = self._serialize() 

466 self._needs_serialization = False 

467 assert self._chunked_text is not None 

468 return self._chunked_text 

469 

470 def as_raw_string(self) -> bytes: 

471 """Return raw string with serialization of the object. 

472 

473 Returns: String object 

474 """ 

475 return b"".join(self.as_raw_chunks()) 

476 

477 def __bytes__(self) -> bytes: 

478 """Return raw string serialization of this object.""" 

479 return self.as_raw_string() 

480 

481 def __hash__(self) -> int: 

482 """Return unique hash for this object.""" 

483 return hash(self.id) 

484 

485 def as_pretty_string(self) -> str: 

486 """Return a string representing this object, fit for display.""" 

487 return self.as_raw_string().decode("utf-8", "replace") 

488 

489 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None: 

490 """Set the contents of this object from a serialized string.""" 

491 if not isinstance(text, bytes): 

492 raise TypeError(f"Expected bytes for text, got {text!r}") 

493 self.set_raw_chunks([text], sha) 

494 

495 def set_raw_chunks( 

496 self, chunks: list[bytes], sha: Optional[ObjectID] = None 

497 ) -> None: 

498 """Set the contents of this object from a list of chunks.""" 

499 self._chunked_text = chunks 

500 self._deserialize(chunks) 

501 if sha is None: 

502 self._sha = None 

503 else: 

504 self._sha = FixedSha(sha) 

505 self._needs_serialization = False 

506 

507 @staticmethod 

508 def _parse_object_header( 

509 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

510 ) -> "ShaFile": 

511 """Parse a new style object, creating it but not reading the file.""" 

512 num_type = (ord(magic[0:1]) >> 4) & 7 

513 obj_class = object_class(num_type) 

514 if not obj_class: 

515 raise ObjectFormatException(f"Not a known type {num_type}") 

516 return obj_class() 

517 

518 def _parse_object(self, map: bytes) -> None: 

519 """Parse a new style object, setting self._text.""" 

520 # skip type and size; type must have already been determined, and 

521 # we trust zlib to fail if it's otherwise corrupted 

522 byte = ord(map[0:1]) 

523 used = 1 

524 while (byte & 0x80) != 0: 

525 byte = ord(map[used : used + 1]) 

526 used += 1 

527 raw = map[used:] 

528 self.set_raw_string(_decompress(raw)) 

529 

530 @classmethod 

531 def _is_legacy_object(cls, magic: bytes) -> bool: 

532 b0 = ord(magic[0:1]) 

533 b1 = ord(magic[1:2]) 

534 word = (b0 << 8) + b1 

535 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

536 

537 @classmethod 

538 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

539 map = f.read() 

540 if not map: 

541 raise EmptyFileException("Corrupted empty file detected") 

542 

543 if cls._is_legacy_object(map): 

544 obj = cls._parse_legacy_object_header(map, f) 

545 obj._parse_legacy_object(map) 

546 else: 

547 obj = cls._parse_object_header(map, f) 

548 obj._parse_object(map) 

549 return obj 

550 

551 def __init__(self) -> None: 

552 """Don't call this directly.""" 

553 self._sha = None 

554 self._chunked_text = [] 

555 self._needs_serialization = True 

556 

557 def _deserialize(self, chunks: list[bytes]) -> None: 

558 raise NotImplementedError(self._deserialize) 

559 

560 def _serialize(self) -> list[bytes]: 

561 raise NotImplementedError(self._serialize) 

562 

563 @classmethod 

564 def from_path(cls, path: Union[str, bytes]) -> "ShaFile": 

565 """Open a SHA file from disk.""" 

566 with GitFile(path, "rb") as f: 

567 return cls.from_file(f) 

568 

569 @classmethod 

570 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

571 """Get the contents of a SHA file on disk.""" 

572 try: 

573 obj = cls._parse_file(f) 

574 obj._sha = None 

575 return obj 

576 except (IndexError, ValueError) as exc: 

577 raise ObjectFormatException("invalid object header") from exc 

578 

579 @staticmethod 

580 def from_raw_string( 

581 type_num: int, string: bytes, sha: Optional[ObjectID] = None 

582 ) -> "ShaFile": 

583 """Creates an object of the indicated type from the raw string given. 

584 

585 Args: 

586 type_num: The numeric type of the object. 

587 string: The raw uncompressed contents. 

588 sha: Optional known sha for the object 

589 """ 

590 cls = object_class(type_num) 

591 if cls is None: 

592 raise AssertionError(f"unsupported class type num: {type_num}") 

593 obj = cls() 

594 obj.set_raw_string(string, sha) 

595 return obj 

596 

597 @staticmethod 

598 def from_raw_chunks( 

599 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None 

600 ) -> "ShaFile": 

601 """Creates an object of the indicated type from the raw chunks given. 

602 

603 Args: 

604 type_num: The numeric type of the object. 

605 chunks: An iterable of the raw uncompressed contents. 

606 sha: Optional known sha for the object 

607 """ 

608 cls = object_class(type_num) 

609 if cls is None: 

610 raise AssertionError(f"unsupported class type num: {type_num}") 

611 obj = cls() 

612 obj.set_raw_chunks(chunks, sha) 

613 return obj 

614 

615 @classmethod 

616 def from_string(cls, string: bytes) -> Self: 

617 """Create a ShaFile from a string.""" 

618 obj = cls() 

619 obj.set_raw_string(string) 

620 return obj 

621 

622 def _check_has_member(self, member: str, error_msg: str) -> None: 

623 """Check that the object has a given member variable. 

624 

625 Args: 

626 member: the member variable to check for 

627 error_msg: the message for an error if the member is missing 

628 Raises: 

629 ObjectFormatException: with the given error_msg if member is 

630 missing or is None 

631 """ 

632 if getattr(self, member, None) is None: 

633 raise ObjectFormatException(error_msg) 

634 

635 def check(self) -> None: 

636 """Check this object for internal consistency. 

637 

638 Raises: 

639 ObjectFormatException: if the object is malformed in some way 

640 ChecksumMismatch: if the object was created with a SHA that does 

641 not match its contents 

642 """ 

643 # TODO: if we find that error-checking during object parsing is a 

644 # performance bottleneck, those checks should be moved to the class's 

645 # check() method during optimization so we can still check the object 

646 # when necessary. 

647 old_sha = self.id 

648 try: 

649 self._deserialize(self.as_raw_chunks()) 

650 self._sha = None 

651 new_sha = self.id 

652 except Exception as exc: 

653 raise ObjectFormatException(exc) from exc 

654 if old_sha != new_sha: 

655 raise ChecksumMismatch(new_sha, old_sha) 

656 

657 def _header(self) -> bytes: 

658 return object_header(self.type_num, self.raw_length()) 

659 

660 def raw_length(self) -> int: 

661 """Returns the length of the raw string of this object.""" 

662 return sum(map(len, self.as_raw_chunks())) 

663 

664 def sha(self) -> Union[FixedSha, "HASH"]: 

665 """The SHA1 object that is the name of this object.""" 

666 if self._sha is None or self._needs_serialization: 

667 # this is a local because as_raw_chunks() overwrites self._sha 

668 new_sha = sha1() 

669 new_sha.update(self._header()) 

670 for chunk in self.as_raw_chunks(): 

671 new_sha.update(chunk) 

672 self._sha = new_sha 

673 return self._sha 

674 

675 def copy(self) -> "ShaFile": 

676 """Create a new copy of this SHA1 object from its raw string.""" 

677 obj_class = object_class(self.type_num) 

678 if obj_class is None: 

679 raise AssertionError(f"invalid type num {self.type_num}") 

680 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

681 

682 @property 

683 def id(self) -> bytes: 

684 """The hex SHA of this object.""" 

685 return self.sha().hexdigest().encode("ascii") 

686 

687 def __repr__(self) -> str: 

688 """Return string representation of this object.""" 

689 return f"<{self.__class__.__name__} {self.id!r}>" 

690 

691 def __ne__(self, other: object) -> bool: 

692 """Check whether this object does not match the other.""" 

693 return not isinstance(other, ShaFile) or self.id != other.id 

694 

695 def __eq__(self, other: object) -> bool: 

696 """Return True if the SHAs of the two objects match.""" 

697 return isinstance(other, ShaFile) and self.id == other.id 

698 

699 def __lt__(self, other: object) -> bool: 

700 """Return whether SHA of this object is less than the other.""" 

701 if not isinstance(other, ShaFile): 

702 raise TypeError 

703 return self.id < other.id 

704 

705 def __le__(self, other: object) -> bool: 

706 """Check whether SHA of this object is less than or equal to the other.""" 

707 if not isinstance(other, ShaFile): 

708 raise TypeError 

709 return self.id <= other.id 

710 

711 

712class Blob(ShaFile): 

713 """A Git Blob object.""" 

714 

715 __slots__ = () 

716 

717 type_name = b"blob" 

718 type_num = 3 

719 

720 _chunked_text: list[bytes] 

721 

722 def __init__(self) -> None: 

723 """Initialize a new Blob object.""" 

724 super().__init__() 

725 self._chunked_text = [] 

726 self._needs_serialization = False 

727 

728 def _get_data(self) -> bytes: 

729 return self.as_raw_string() 

730 

731 def _set_data(self, data: bytes) -> None: 

732 self.set_raw_string(data) 

733 

734 data = property( 

735 _get_data, _set_data, doc="The text contained within the blob object." 

736 ) 

737 

738 def _get_chunked(self) -> list[bytes]: 

739 return self._chunked_text 

740 

741 def _set_chunked(self, chunks: list[bytes]) -> None: 

742 self._chunked_text = chunks 

743 

744 def _serialize(self) -> list[bytes]: 

745 return self._chunked_text 

746 

747 def _deserialize(self, chunks: list[bytes]) -> None: 

748 self._chunked_text = chunks 

749 

750 chunked = property( 

751 _get_chunked, 

752 _set_chunked, 

753 doc="The text in the blob object, as chunks (not necessarily lines)", 

754 ) 

755 

756 @classmethod 

757 def from_path(cls, path: Union[str, bytes]) -> "Blob": 

758 """Read a blob from a file on disk. 

759 

760 Args: 

761 path: Path to the blob file 

762 

763 Returns: 

764 A Blob object 

765 

766 Raises: 

767 NotBlobError: If the file is not a blob 

768 """ 

769 blob = ShaFile.from_path(path) 

770 if not isinstance(blob, cls): 

771 raise NotBlobError(_path_to_bytes(path)) 

772 return blob 

773 

774 def check(self) -> None: 

775 """Check this object for internal consistency. 

776 

777 Raises: 

778 ObjectFormatException: if the object is malformed in some way 

779 """ 

780 super().check() 

781 

782 def splitlines(self) -> list[bytes]: 

783 """Return list of lines in this blob. 

784 

785 This preserves the original line endings. 

786 """ 

787 chunks = self.chunked 

788 if not chunks: 

789 return [] 

790 if len(chunks) == 1: 

791 result: list[bytes] = chunks[0].splitlines(True) 

792 return result 

793 remaining = None 

794 ret = [] 

795 for chunk in chunks: 

796 lines = chunk.splitlines(True) 

797 if len(lines) > 1: 

798 ret.append((remaining or b"") + lines[0]) 

799 ret.extend(lines[1:-1]) 

800 remaining = lines[-1] 

801 elif len(lines) == 1: 

802 if remaining is None: 

803 remaining = lines.pop() 

804 else: 

805 remaining += lines.pop() 

806 if remaining is not None: 

807 ret.append(remaining) 

808 return ret 

809 

810 

811def _parse_message( 

812 chunks: Iterable[bytes], 

813) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]: 

814 """Parse a message with a list of fields and a body. 

815 

816 Args: 

817 chunks: the raw chunks of the tag or commit object. 

818 Returns: iterator of tuples of (field, value), one per header line, in the 

819 order read from the text, possibly including duplicates. Includes a 

820 field named None for the freeform tag/commit text. 

821 """ 

822 f = BytesIO(b"".join(chunks)) 

823 k = None 

824 v = b"" 

825 eof = False 

826 

827 def _strip_last_newline(value: bytes) -> bytes: 

828 """Strip the last newline from value.""" 

829 if value and value.endswith(b"\n"): 

830 return value[:-1] 

831 return value 

832 

833 # Parse the headers 

834 # 

835 # Headers can contain newlines. The next line is indented with a space. 

836 # We store the latest key as 'k', and the accumulated value as 'v'. 

837 for line in f: 

838 if line.startswith(b" "): 

839 # Indented continuation of the previous line 

840 v += line[1:] 

841 else: 

842 if k is not None: 

843 # We parsed a new header, return its value 

844 yield (k, _strip_last_newline(v)) 

845 if line == b"\n": 

846 # Empty line indicates end of headers 

847 break 

848 (k, v) = line.split(b" ", 1) 

849 

850 else: 

851 # We reached end of file before the headers ended. We still need to 

852 # return the previous header, then we need to return a None field for 

853 # the text. 

854 eof = True 

855 if k is not None: 

856 yield (k, _strip_last_newline(v)) 

857 yield (None, None) 

858 

859 if not eof: 

860 # We didn't reach the end of file while parsing headers. We can return 

861 # the rest of the file as a message. 

862 yield (None, f.read()) 

863 

864 f.close() 

865 

866 

867def _format_message( 

868 headers: Sequence[tuple[bytes, bytes]], body: Optional[bytes] 

869) -> Iterator[bytes]: 

870 for field, value in headers: 

871 lines = value.split(b"\n") 

872 yield git_line(field, lines[0]) 

873 for line in lines[1:]: 

874 yield b" " + line + b"\n" 

875 yield b"\n" # There must be a new line after the headers 

876 if body: 

877 yield body 

878 

879 

880class Tag(ShaFile): 

881 """A Git Tag object.""" 

882 

883 type_name = b"tag" 

884 type_num = 4 

885 

886 __slots__ = ( 

887 "_message", 

888 "_name", 

889 "_object_class", 

890 "_object_sha", 

891 "_signature", 

892 "_tag_time", 

893 "_tag_timezone", 

894 "_tag_timezone_neg_utc", 

895 "_tagger", 

896 ) 

897 

898 _message: Optional[bytes] 

899 _name: Optional[bytes] 

900 _object_class: Optional[type["ShaFile"]] 

901 _object_sha: Optional[bytes] 

902 _signature: Optional[bytes] 

903 _tag_time: Optional[int] 

904 _tag_timezone: Optional[int] 

905 _tag_timezone_neg_utc: Optional[bool] 

906 _tagger: Optional[bytes] 

907 

908 def __init__(self) -> None: 

909 """Initialize a new Tag object.""" 

910 super().__init__() 

911 self._tagger = None 

912 self._tag_time = None 

913 self._tag_timezone = None 

914 self._tag_timezone_neg_utc = False 

915 self._signature: Optional[bytes] = None 

916 

917 @classmethod 

918 def from_path(cls, filename: Union[str, bytes]) -> "Tag": 

919 """Read a tag from a file on disk. 

920 

921 Args: 

922 filename: Path to the tag file 

923 

924 Returns: 

925 A Tag object 

926 

927 Raises: 

928 NotTagError: If the file is not a tag 

929 """ 

930 tag = ShaFile.from_path(filename) 

931 if not isinstance(tag, cls): 

932 raise NotTagError(_path_to_bytes(filename)) 

933 return tag 

934 

935 def check(self) -> None: 

936 """Check this object for internal consistency. 

937 

938 Raises: 

939 ObjectFormatException: if the object is malformed in some way 

940 """ 

941 super().check() 

942 assert self._chunked_text is not None 

943 self._check_has_member("_object_sha", "missing object sha") 

944 self._check_has_member("_object_class", "missing object type") 

945 self._check_has_member("_name", "missing tag name") 

946 

947 if not self._name: 

948 raise ObjectFormatException("empty tag name") 

949 

950 if self._object_sha is None: 

951 raise ObjectFormatException("missing object sha") 

952 check_hexsha(self._object_sha, "invalid object sha") 

953 

954 if self._tagger is not None: 

955 check_identity(self._tagger, "invalid tagger") 

956 

957 self._check_has_member("_tag_time", "missing tag time") 

958 if self._tag_time is None: 

959 raise ObjectFormatException("missing tag time") 

960 check_time(self._tag_time) 

961 

962 last = None 

963 for field, _ in _parse_message(self._chunked_text): 

964 if field == _OBJECT_HEADER and last is not None: 

965 raise ObjectFormatException("unexpected object") 

966 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

967 raise ObjectFormatException("unexpected type") 

968 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

969 raise ObjectFormatException("unexpected tag name") 

970 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

971 raise ObjectFormatException("unexpected tagger") 

972 last = field 

973 

974 def _serialize(self) -> list[bytes]: 

975 headers = [] 

976 if self._object_sha is None: 

977 raise ObjectFormatException("missing object sha") 

978 headers.append((_OBJECT_HEADER, self._object_sha)) 

979 if self._object_class is None: 

980 raise ObjectFormatException("missing object class") 

981 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

982 if self._name is None: 

983 raise ObjectFormatException("missing tag name") 

984 headers.append((_TAG_HEADER, self._name)) 

985 if self._tagger: 

986 if self._tag_time is None: 

987 headers.append((_TAGGER_HEADER, self._tagger)) 

988 else: 

989 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

990 raise ObjectFormatException("missing timezone info") 

991 headers.append( 

992 ( 

993 _TAGGER_HEADER, 

994 format_time_entry( 

995 self._tagger, 

996 self._tag_time, 

997 (self._tag_timezone, self._tag_timezone_neg_utc), 

998 ), 

999 ) 

1000 ) 

1001 

1002 if self.message is None and self._signature is None: 

1003 body = None 

1004 else: 

1005 body = (self.message or b"") + (self._signature or b"") 

1006 return list(_format_message(headers, body)) 

1007 

1008 def _deserialize(self, chunks: list[bytes]) -> None: 

1009 """Grab the metadata attached to the tag.""" 

1010 self._tagger = None 

1011 self._tag_time = None 

1012 self._tag_timezone = None 

1013 self._tag_timezone_neg_utc = False 

1014 for field, value in _parse_message(chunks): 

1015 if field == _OBJECT_HEADER: 

1016 self._object_sha = value 

1017 elif field == _TYPE_HEADER: 

1018 assert isinstance(value, bytes) 

1019 obj_class = object_class(value) 

1020 if not obj_class: 

1021 raise ObjectFormatException(f"Not a known type: {value!r}") 

1022 self._object_class = obj_class 

1023 elif field == _TAG_HEADER: 

1024 self._name = value 

1025 elif field == _TAGGER_HEADER: 

1026 if value is None: 

1027 raise ObjectFormatException("missing tagger value") 

1028 ( 

1029 self._tagger, 

1030 self._tag_time, 

1031 (self._tag_timezone, self._tag_timezone_neg_utc), 

1032 ) = parse_time_entry(value) 

1033 elif field is None: 

1034 if value is None: 

1035 self._message = None 

1036 self._signature = None 

1037 else: 

1038 # Try to find either PGP or SSH signature 

1039 sig_idx = None 

1040 try: 

1041 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

1042 except ValueError: 

1043 try: 

1044 sig_idx = value.index(BEGIN_SSH_SIGNATURE) 

1045 except ValueError: 

1046 pass 

1047 

1048 if sig_idx is not None: 

1049 self._message = value[:sig_idx] 

1050 self._signature = value[sig_idx:] 

1051 else: 

1052 self._message = value 

1053 self._signature = None 

1054 else: 

1055 raise ObjectFormatException( 

1056 f"Unknown field {field.decode('ascii', 'replace')}" 

1057 ) 

1058 

1059 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

1060 """Get the object pointed to by this tag. 

1061 

1062 Returns: tuple of (object class, sha). 

1063 """ 

1064 if self._object_class is None or self._object_sha is None: 

1065 raise ValueError("Tag object is not properly initialized") 

1066 return (self._object_class, self._object_sha) 

1067 

1068 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

1069 (self._object_class, self._object_sha) = value 

1070 self._needs_serialization = True 

1071 

1072 object = property(_get_object, _set_object) 

1073 

1074 name = serializable_property("name", "The name of this tag") 

1075 tagger = serializable_property( 

1076 "tagger", "Returns the name of the person who created this tag" 

1077 ) 

1078 tag_time = serializable_property( 

1079 "tag_time", 

1080 "The creation timestamp of the tag. As the number of seconds since the epoch", 

1081 ) 

1082 tag_timezone = serializable_property( 

1083 "tag_timezone", "The timezone that tag_time is in." 

1084 ) 

1085 message = serializable_property("message", "the message attached to this tag") 

1086 

1087 signature = serializable_property("signature", "Optional detached GPG signature") 

1088 

1089 def sign(self, keyid: Optional[str] = None) -> None: 

1090 """Sign this tag with a GPG key. 

1091 

1092 Args: 

1093 keyid: Optional GPG key ID to use for signing. If not specified, 

1094 the default GPG key will be used. 

1095 """ 

1096 import gpg 

1097 

1098 with gpg.Context(armor=True) as c: 

1099 if keyid is not None: 

1100 key = c.get_key(keyid) 

1101 with gpg.Context(armor=True, signers=[key]) as ctx: 

1102 self.signature, _unused_result = ctx.sign( 

1103 self.as_raw_string(), 

1104 mode=gpg.constants.sig.mode.DETACH, 

1105 ) 

1106 else: 

1107 self.signature, _unused_result = c.sign( 

1108 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1109 ) 

1110 

1111 def raw_without_sig(self) -> bytes: 

1112 """Return raw string serialization without the GPG/SSH signature. 

1113 

1114 self.signature is a signature for the returned raw byte string serialization. 

1115 """ 

1116 ret = self.as_raw_string() 

1117 if self._signature: 

1118 ret = ret[: -len(self._signature)] 

1119 return ret 

1120 

1121 def extract_signature(self) -> tuple[bytes, Optional[bytes], Optional[bytes]]: 

1122 """Extract the payload, signature, and signature type from this tag. 

1123 

1124 Returns: 

1125 Tuple of (payload, signature, signature_type) where: 

1126 - payload: The raw tag data without the signature 

1127 - signature: The signature bytes if present, None otherwise 

1128 - signature_type: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1129 

1130 Raises: 

1131 ObjectFormatException: If signature has unknown format 

1132 """ 

1133 if self._signature is None: 

1134 return self.as_raw_string(), None, None 

1135 

1136 payload = self.raw_without_sig() 

1137 

1138 # Determine signature type 

1139 if self._signature.startswith(BEGIN_PGP_SIGNATURE): 

1140 sig_type = SIGNATURE_PGP 

1141 elif self._signature.startswith(BEGIN_SSH_SIGNATURE): 

1142 sig_type = SIGNATURE_SSH 

1143 else: 

1144 raise ObjectFormatException("Unknown signature format") 

1145 

1146 return payload, self._signature, sig_type 

1147 

1148 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1149 """Verify GPG signature for this tag (if it is signed). 

1150 

1151 Args: 

1152 keyids: Optional iterable of trusted keyids for this tag. 

1153 If this tag is not signed by any key in keyids verification will 

1154 fail. If not specified, this function only verifies that the tag 

1155 has a valid signature. 

1156 

1157 Raises: 

1158 gpg.errors.BadSignatures: if GPG signature verification fails 

1159 gpg.errors.MissingSignatures: if tag was not signed by a key 

1160 specified in keyids 

1161 """ 

1162 if self._signature is None: 

1163 return 

1164 

1165 import gpg 

1166 

1167 with gpg.Context() as ctx: 

1168 data, result = ctx.verify( 

1169 self.raw_without_sig(), 

1170 signature=self._signature, 

1171 ) 

1172 if keyids: 

1173 keys = [ctx.get_key(key) for key in keyids] 

1174 for key in keys: 

1175 for subkey in key.subkeys: 

1176 for sig in result.signatures: 

1177 if subkey.can_sign and subkey.fpr == sig.fpr: 

1178 return 

1179 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1180 

1181 

1182class TreeEntry(NamedTuple): 

1183 """Named tuple encapsulating a single tree entry.""" 

1184 

1185 path: bytes 

1186 mode: int 

1187 sha: bytes 

1188 

1189 def in_path(self, path: bytes) -> "TreeEntry": 

1190 """Return a copy of this entry with the given path prepended.""" 

1191 if not isinstance(self.path, bytes): 

1192 raise TypeError(f"Expected bytes for path, got {path!r}") 

1193 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1194 

1195 

1196def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]: 

1197 """Parse a tree text. 

1198 

1199 Args: 

1200 text: Serialized text to parse 

1201 strict: If True, enforce strict validation 

1202 Returns: iterator of tuples of (name, mode, sha) 

1203 

1204 Raises: 

1205 ObjectFormatException: if the object was malformed in some way 

1206 """ 

1207 count = 0 

1208 length = len(text) 

1209 while count < length: 

1210 mode_end = text.index(b" ", count) 

1211 mode_text = text[count:mode_end] 

1212 if strict and mode_text.startswith(b"0"): 

1213 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1214 try: 

1215 mode = int(mode_text, 8) 

1216 except ValueError as exc: 

1217 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1218 name_end = text.index(b"\0", mode_end) 

1219 name = text[mode_end + 1 : name_end] 

1220 count = name_end + 21 

1221 sha = text[name_end + 1 : count] 

1222 if len(sha) != 20: 

1223 raise ObjectFormatException("Sha has invalid length") 

1224 hexsha = sha_to_hex(sha) 

1225 yield (name, mode, hexsha) 

1226 

1227 

1228def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]: 

1229 """Serialize the items in a tree to a text. 

1230 

1231 Args: 

1232 items: Sorted iterable over (name, mode, sha) tuples 

1233 Returns: Serialized tree text as chunks 

1234 """ 

1235 for name, mode, hexsha in items: 

1236 yield ( 

1237 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1238 ) 

1239 

1240 

1241def sorted_tree_items( 

1242 entries: dict[bytes, tuple[int, bytes]], name_order: bool 

1243) -> Iterator[TreeEntry]: 

1244 """Iterate over a tree entries dictionary. 

1245 

1246 Args: 

1247 name_order: If True, iterate entries in order of their name. If 

1248 False, iterate entries in tree order, that is, treat subtree entries as 

1249 having '/' appended. 

1250 entries: Dictionary mapping names to (mode, sha) tuples 

1251 Returns: Iterator over (name, mode, hexsha) 

1252 """ 

1253 if name_order: 

1254 key_func = key_entry_name_order 

1255 else: 

1256 key_func = key_entry 

1257 for name, entry in sorted(entries.items(), key=key_func): 

1258 mode, hexsha = entry 

1259 # Stricter type checks than normal to mirror checks in the Rust version. 

1260 mode = int(mode) 

1261 if not isinstance(hexsha, bytes): 

1262 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1263 yield TreeEntry(name, mode, hexsha) 

1264 

1265 

1266def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1267 """Sort key for tree entry. 

1268 

1269 Args: 

1270 entry: (name, value) tuple 

1271 """ 

1272 (name, (mode, _sha)) = entry 

1273 if stat.S_ISDIR(mode): 

1274 name += b"/" 

1275 return name 

1276 

1277 

1278def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1279 """Sort key for tree entry in name order.""" 

1280 return entry[0] 

1281 

1282 

1283def pretty_format_tree_entry( 

1284 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8" 

1285) -> str: 

1286 """Pretty format tree entry. 

1287 

1288 Args: 

1289 name: Name of the directory entry 

1290 mode: Mode of entry 

1291 hexsha: Hexsha of the referenced object 

1292 encoding: Character encoding for the name 

1293 Returns: string describing the tree entry 

1294 """ 

1295 if mode & stat.S_IFDIR: 

1296 kind = "tree" 

1297 else: 

1298 kind = "blob" 

1299 return "{:04o} {} {}\t{}\n".format( 

1300 mode, 

1301 kind, 

1302 hexsha.decode("ascii"), 

1303 name.decode(encoding, "replace"), 

1304 ) 

1305 

1306 

1307class SubmoduleEncountered(Exception): 

1308 """A submodule was encountered while resolving a path.""" 

1309 

1310 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1311 """Initialize SubmoduleEncountered exception. 

1312 

1313 Args: 

1314 path: Path where the submodule was encountered 

1315 sha: SHA of the submodule 

1316 """ 

1317 self.path = path 

1318 self.sha = sha 

1319 

1320 

1321class Tree(ShaFile): 

1322 """A Git tree object.""" 

1323 

1324 type_name = b"tree" 

1325 type_num = 2 

1326 

1327 __slots__ = "_entries" 

1328 

1329 def __init__(self) -> None: 

1330 """Initialize an empty Tree.""" 

1331 super().__init__() 

1332 self._entries: dict[bytes, tuple[int, bytes]] = {} 

1333 

1334 @classmethod 

1335 def from_path(cls, filename: Union[str, bytes]) -> "Tree": 

1336 """Read a tree from a file on disk. 

1337 

1338 Args: 

1339 filename: Path to the tree file 

1340 

1341 Returns: 

1342 A Tree object 

1343 

1344 Raises: 

1345 NotTreeError: If the file is not a tree 

1346 """ 

1347 tree = ShaFile.from_path(filename) 

1348 if not isinstance(tree, cls): 

1349 raise NotTreeError(_path_to_bytes(filename)) 

1350 return tree 

1351 

1352 def __contains__(self, name: bytes) -> bool: 

1353 """Check if name exists in tree.""" 

1354 return name in self._entries 

1355 

1356 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1357 """Get tree entry by name.""" 

1358 return self._entries[name] 

1359 

1360 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1361 """Set a tree entry by name. 

1362 

1363 Args: 

1364 name: The name of the entry, as a string. 

1365 value: A tuple of (mode, hexsha), where mode is the mode of the 

1366 entry as an integral type and hexsha is the hex SHA of the entry as 

1367 a string. 

1368 """ 

1369 mode, hexsha = value 

1370 self._entries[name] = (mode, hexsha) 

1371 self._needs_serialization = True 

1372 

1373 def __delitem__(self, name: bytes) -> None: 

1374 """Delete tree entry by name.""" 

1375 del self._entries[name] 

1376 self._needs_serialization = True 

1377 

1378 def __len__(self) -> int: 

1379 """Return number of entries in tree.""" 

1380 return len(self._entries) 

1381 

1382 def __iter__(self) -> Iterator[bytes]: 

1383 """Iterate over tree entry names.""" 

1384 return iter(self._entries) 

1385 

1386 def add(self, name: bytes, mode: int, hexsha: bytes) -> None: 

1387 """Add an entry to the tree. 

1388 

1389 Args: 

1390 mode: The mode of the entry as an integral type. Not all 

1391 possible modes are supported by git; see check() for details. 

1392 name: The name of the entry, as a string. 

1393 hexsha: The hex SHA of the entry as a string. 

1394 """ 

1395 self._entries[name] = mode, hexsha 

1396 self._needs_serialization = True 

1397 

1398 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1399 """Iterate over entries. 

1400 

1401 Args: 

1402 name_order: If True, iterate in name order instead of tree 

1403 order. 

1404 Returns: Iterator over (name, mode, sha) tuples 

1405 """ 

1406 return sorted_tree_items(self._entries, name_order) 

1407 

1408 def items(self) -> list[TreeEntry]: 

1409 """Return the sorted entries in this tree. 

1410 

1411 Returns: List with (name, mode, sha) tuples 

1412 """ 

1413 return list(self.iteritems()) 

1414 

1415 def _deserialize(self, chunks: list[bytes]) -> None: 

1416 """Grab the entries in the tree.""" 

1417 try: 

1418 parsed_entries = parse_tree(b"".join(chunks)) 

1419 except ValueError as exc: 

1420 raise ObjectFormatException(exc) from exc 

1421 # TODO: list comprehension is for efficiency in the common (small) 

1422 # case; if memory efficiency in the large case is a concern, use a 

1423 # genexp. 

1424 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1425 

1426 def check(self) -> None: 

1427 """Check this object for internal consistency. 

1428 

1429 Raises: 

1430 ObjectFormatException: if the object is malformed in some way 

1431 """ 

1432 super().check() 

1433 assert self._chunked_text is not None 

1434 last = None 

1435 allowed_modes = ( 

1436 stat.S_IFREG | 0o755, 

1437 stat.S_IFREG | 0o644, 

1438 stat.S_IFLNK, 

1439 stat.S_IFDIR, 

1440 S_IFGITLINK, 

1441 # TODO: optionally exclude as in git fsck --strict 

1442 stat.S_IFREG | 0o664, 

1443 ) 

1444 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1445 check_hexsha(sha, f"invalid sha {sha!r}") 

1446 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1447 raise ObjectFormatException( 

1448 "invalid name {}".format(name.decode("utf-8", "replace")) 

1449 ) 

1450 

1451 if mode not in allowed_modes: 

1452 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1453 

1454 entry = (name, (mode, sha)) 

1455 if last: 

1456 if key_entry(last) > key_entry(entry): 

1457 raise ObjectFormatException("entries not sorted") 

1458 if name == last[0]: 

1459 raise ObjectFormatException(f"duplicate entry {name!r}") 

1460 last = entry 

1461 

1462 def _serialize(self) -> list[bytes]: 

1463 return list(serialize_tree(self.iteritems())) 

1464 

1465 def as_pretty_string(self) -> str: 

1466 """Return a human-readable string representation of this tree. 

1467 

1468 Returns: 

1469 Pretty-printed tree entries 

1470 """ 

1471 text: list[str] = [] 

1472 for entry in self.iteritems(): 

1473 if ( 

1474 entry.path is not None 

1475 and entry.mode is not None 

1476 and entry.sha is not None 

1477 ): 

1478 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha)) 

1479 return "".join(text) 

1480 

1481 def lookup_path( 

1482 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1483 ) -> tuple[int, ObjectID]: 

1484 """Look up an object in a Git tree. 

1485 

1486 Args: 

1487 lookup_obj: Callback for retrieving object by SHA1 

1488 path: Path to lookup 

1489 Returns: A tuple of (mode, SHA) of the resulting path. 

1490 """ 

1491 # Handle empty path - return the tree itself 

1492 if not path: 

1493 return stat.S_IFDIR, self.id 

1494 

1495 parts = path.split(b"/") 

1496 sha = self.id 

1497 mode: Optional[int] = None 

1498 for i, p in enumerate(parts): 

1499 if not p: 

1500 continue 

1501 if mode is not None and S_ISGITLINK(mode): 

1502 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1503 obj = lookup_obj(sha) 

1504 if not isinstance(obj, Tree): 

1505 raise NotTreeError(sha) 

1506 mode, sha = obj[p] 

1507 if mode is None: 

1508 raise ValueError("No valid path found") 

1509 return mode, sha 

1510 

1511 

1512def parse_timezone(text: bytes) -> tuple[int, bool]: 

1513 """Parse a timezone text fragment (e.g. '+0100'). 

1514 

1515 Args: 

1516 text: Text to parse. 

1517 Returns: Tuple with timezone as seconds difference to UTC 

1518 and a boolean indicating whether this was a UTC timezone 

1519 prefixed with a negative sign (-0000). 

1520 """ 

1521 # cgit parses the first character as the sign, and the rest 

1522 # as an integer (using strtol), which could also be negative. 

1523 # We do the same for compatibility. See #697828. 

1524 if text[0] not in b"+-": 

1525 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1526 sign = text[:1] 

1527 offset = int(text[1:]) 

1528 if sign == b"-": 

1529 offset = -offset 

1530 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1531 signum = ((offset < 0) and -1) or 1 

1532 offset = abs(offset) 

1533 hours = int(offset / 100) 

1534 minutes = offset % 100 

1535 return ( 

1536 signum * (hours * 3600 + minutes * 60), 

1537 unnecessary_negative_timezone, 

1538 ) 

1539 

1540 

1541def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1542 """Format a timezone for Git serialization. 

1543 

1544 Args: 

1545 offset: Timezone offset as seconds difference to UTC 

1546 unnecessary_negative_timezone: Whether to use a minus sign for 

1547 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1548 """ 

1549 if offset % 60 != 0: 

1550 raise ValueError("Unable to handle non-minute offset.") 

1551 if offset < 0 or unnecessary_negative_timezone: 

1552 sign = "-" 

1553 offset = -offset 

1554 else: 

1555 sign = "+" 

1556 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1557 

1558 

1559def parse_time_entry( 

1560 value: bytes, 

1561) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]: 

1562 """Parse event. 

1563 

1564 Args: 

1565 value: Bytes representing a git commit/tag line 

1566 Raises: 

1567 ObjectFormatException in case of parsing error (malformed 

1568 field date) 

1569 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1570 """ 

1571 try: 

1572 sep = value.rindex(b"> ") 

1573 except ValueError: 

1574 return (value, None, (None, False)) 

1575 try: 

1576 person = value[0 : sep + 1] 

1577 rest = value[sep + 2 :] 

1578 timetext, timezonetext = rest.rsplit(b" ", 1) 

1579 time = int(timetext) 

1580 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1581 except ValueError as exc: 

1582 raise ObjectFormatException(exc) from exc 

1583 return person, time, (timezone, timezone_neg_utc) 

1584 

1585 

1586def format_time_entry( 

1587 person: bytes, time: int, timezone_info: tuple[int, bool] 

1588) -> bytes: 

1589 """Format an event.""" 

1590 (timezone, timezone_neg_utc) = timezone_info 

1591 return b" ".join( 

1592 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1593 ) 

1594 

1595 

1596@replace_me(since="0.21.0", remove_in="0.24.0") 

1597def parse_commit( 

1598 chunks: Iterable[bytes], 

1599) -> tuple[ 

1600 Optional[bytes], 

1601 list[bytes], 

1602 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1603 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1604 Optional[bytes], 

1605 list[Tag], 

1606 Optional[bytes], 

1607 Optional[bytes], 

1608 list[tuple[bytes, bytes]], 

1609]: 

1610 """Parse a commit object from chunks. 

1611 

1612 Args: 

1613 chunks: Chunks to parse 

1614 Returns: Tuple of (tree, parents, author_info, commit_info, 

1615 encoding, mergetag, gpgsig, message, extra) 

1616 """ 

1617 parents = [] 

1618 extra = [] 

1619 tree = None 

1620 author_info: tuple[ 

1621 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1622 ] = (None, None, (None, None)) 

1623 commit_info: tuple[ 

1624 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1625 ] = (None, None, (None, None)) 

1626 encoding = None 

1627 mergetag = [] 

1628 message = None 

1629 gpgsig = None 

1630 

1631 for field, value in _parse_message(chunks): 

1632 # TODO(jelmer): Enforce ordering 

1633 if field == _TREE_HEADER: 

1634 tree = value 

1635 elif field == _PARENT_HEADER: 

1636 if value is None: 

1637 raise ObjectFormatException("missing parent value") 

1638 parents.append(value) 

1639 elif field == _AUTHOR_HEADER: 

1640 if value is None: 

1641 raise ObjectFormatException("missing author value") 

1642 author_info = parse_time_entry(value) 

1643 elif field == _COMMITTER_HEADER: 

1644 if value is None: 

1645 raise ObjectFormatException("missing committer value") 

1646 commit_info = parse_time_entry(value) 

1647 elif field == _ENCODING_HEADER: 

1648 encoding = value 

1649 elif field == _MERGETAG_HEADER: 

1650 if value is None: 

1651 raise ObjectFormatException("missing mergetag value") 

1652 tag = Tag.from_string(value + b"\n") 

1653 assert isinstance(tag, Tag) 

1654 mergetag.append(tag) 

1655 elif field == _GPGSIG_HEADER: 

1656 gpgsig = value 

1657 elif field is None: 

1658 message = value 

1659 else: 

1660 if value is None: 

1661 raise ObjectFormatException(f"missing value for field {field!r}") 

1662 extra.append((field, value)) 

1663 return ( 

1664 tree, 

1665 parents, 

1666 author_info, 

1667 commit_info, 

1668 encoding, 

1669 mergetag, 

1670 gpgsig, 

1671 message, 

1672 extra, 

1673 ) 

1674 

1675 

1676class Commit(ShaFile): 

1677 """A git commit object.""" 

1678 

1679 type_name = b"commit" 

1680 type_num = 1 

1681 

1682 __slots__ = ( 

1683 "_author", 

1684 "_author_time", 

1685 "_author_timezone", 

1686 "_author_timezone_neg_utc", 

1687 "_commit_time", 

1688 "_commit_timezone", 

1689 "_commit_timezone_neg_utc", 

1690 "_committer", 

1691 "_encoding", 

1692 "_extra", 

1693 "_gpgsig", 

1694 "_mergetag", 

1695 "_message", 

1696 "_parents", 

1697 "_tree", 

1698 ) 

1699 

1700 def __init__(self) -> None: 

1701 """Initialize an empty Commit.""" 

1702 super().__init__() 

1703 self._parents: list[bytes] = [] 

1704 self._encoding: Optional[bytes] = None 

1705 self._mergetag: list[Tag] = [] 

1706 self._gpgsig: Optional[bytes] = None 

1707 self._extra: list[tuple[bytes, Optional[bytes]]] = [] 

1708 self._author_timezone_neg_utc: Optional[bool] = False 

1709 self._commit_timezone_neg_utc: Optional[bool] = False 

1710 

1711 @classmethod 

1712 def from_path(cls, path: Union[str, bytes]) -> "Commit": 

1713 """Read a commit from a file on disk. 

1714 

1715 Args: 

1716 path: Path to the commit file 

1717 

1718 Returns: 

1719 A Commit object 

1720 

1721 Raises: 

1722 NotCommitError: If the file is not a commit 

1723 """ 

1724 commit = ShaFile.from_path(path) 

1725 if not isinstance(commit, cls): 

1726 raise NotCommitError(_path_to_bytes(path)) 

1727 return commit 

1728 

1729 def _deserialize(self, chunks: list[bytes]) -> None: 

1730 self._parents = [] 

1731 self._extra = [] 

1732 self._tree = None 

1733 author_info: tuple[ 

1734 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1735 ] = (None, None, (None, None)) 

1736 commit_info: tuple[ 

1737 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1738 ] = (None, None, (None, None)) 

1739 self._encoding = None 

1740 self._mergetag = [] 

1741 self._message = None 

1742 self._gpgsig = None 

1743 

1744 for field, value in _parse_message(chunks): 

1745 # TODO(jelmer): Enforce ordering 

1746 if field == _TREE_HEADER: 

1747 self._tree = value 

1748 elif field == _PARENT_HEADER: 

1749 assert value is not None 

1750 self._parents.append(value) 

1751 elif field == _AUTHOR_HEADER: 

1752 if value is None: 

1753 raise ObjectFormatException("missing author value") 

1754 author_info = parse_time_entry(value) 

1755 elif field == _COMMITTER_HEADER: 

1756 if value is None: 

1757 raise ObjectFormatException("missing committer value") 

1758 commit_info = parse_time_entry(value) 

1759 elif field == _ENCODING_HEADER: 

1760 self._encoding = value 

1761 elif field == _MERGETAG_HEADER: 

1762 assert value is not None 

1763 tag = Tag.from_string(value + b"\n") 

1764 assert isinstance(tag, Tag) 

1765 self._mergetag.append(tag) 

1766 elif field == _GPGSIG_HEADER: 

1767 self._gpgsig = value 

1768 elif field is None: 

1769 self._message = value 

1770 else: 

1771 self._extra.append((field, value)) 

1772 

1773 ( 

1774 self._author, 

1775 self._author_time, 

1776 (self._author_timezone, self._author_timezone_neg_utc), 

1777 ) = author_info 

1778 ( 

1779 self._committer, 

1780 self._commit_time, 

1781 (self._commit_timezone, self._commit_timezone_neg_utc), 

1782 ) = commit_info 

1783 

1784 def check(self) -> None: 

1785 """Check this object for internal consistency. 

1786 

1787 Raises: 

1788 ObjectFormatException: if the object is malformed in some way 

1789 """ 

1790 super().check() 

1791 assert self._chunked_text is not None 

1792 self._check_has_member("_tree", "missing tree") 

1793 self._check_has_member("_author", "missing author") 

1794 self._check_has_member("_committer", "missing committer") 

1795 self._check_has_member("_author_time", "missing author time") 

1796 self._check_has_member("_commit_time", "missing commit time") 

1797 

1798 for parent in self._parents: 

1799 check_hexsha(parent, "invalid parent sha") 

1800 assert self._tree is not None # checked by _check_has_member above 

1801 check_hexsha(self._tree, "invalid tree sha") 

1802 

1803 assert self._author is not None # checked by _check_has_member above 

1804 assert self._committer is not None # checked by _check_has_member above 

1805 check_identity(self._author, "invalid author") 

1806 check_identity(self._committer, "invalid committer") 

1807 

1808 assert self._author_time is not None # checked by _check_has_member above 

1809 assert self._commit_time is not None # checked by _check_has_member above 

1810 check_time(self._author_time) 

1811 check_time(self._commit_time) 

1812 

1813 last = None 

1814 for field, _ in _parse_message(self._chunked_text): 

1815 if field == _TREE_HEADER and last is not None: 

1816 raise ObjectFormatException("unexpected tree") 

1817 elif field == _PARENT_HEADER and last not in ( 

1818 _PARENT_HEADER, 

1819 _TREE_HEADER, 

1820 ): 

1821 raise ObjectFormatException("unexpected parent") 

1822 elif field == _AUTHOR_HEADER and last not in ( 

1823 _TREE_HEADER, 

1824 _PARENT_HEADER, 

1825 ): 

1826 raise ObjectFormatException("unexpected author") 

1827 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1828 raise ObjectFormatException("unexpected committer") 

1829 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1830 raise ObjectFormatException("unexpected encoding") 

1831 last = field 

1832 

1833 # TODO: optionally check for duplicate parents 

1834 

1835 def sign(self, keyid: Optional[str] = None) -> None: 

1836 """Sign this commit with a GPG key. 

1837 

1838 Args: 

1839 keyid: Optional GPG key ID to use for signing. If not specified, 

1840 the default GPG key will be used. 

1841 """ 

1842 import gpg 

1843 

1844 with gpg.Context(armor=True) as c: 

1845 if keyid is not None: 

1846 key = c.get_key(keyid) 

1847 with gpg.Context(armor=True, signers=[key]) as ctx: 

1848 self.gpgsig, _unused_result = ctx.sign( 

1849 self.as_raw_string(), 

1850 mode=gpg.constants.sig.mode.DETACH, 

1851 ) 

1852 else: 

1853 self.gpgsig, _unused_result = c.sign( 

1854 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1855 ) 

1856 

1857 def raw_without_sig(self) -> bytes: 

1858 """Return raw string serialization without the GPG/SSH signature. 

1859 

1860 self.gpgsig is a signature for the returned raw byte string serialization. 

1861 """ 

1862 tmp = self.copy() 

1863 assert isinstance(tmp, Commit) 

1864 tmp._gpgsig = None 

1865 tmp.gpgsig = None 

1866 return tmp.as_raw_string() 

1867 

1868 def extract_signature(self) -> tuple[bytes, Optional[bytes], Optional[bytes]]: 

1869 """Extract the payload, signature, and signature type from this commit. 

1870 

1871 Returns: 

1872 Tuple of (payload, signature, signature_type) where: 

1873 - payload: The raw commit data without the signature 

1874 - signature: The signature bytes if present, None otherwise 

1875 - signature_type: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1876 

1877 Raises: 

1878 ObjectFormatException: If signature has unknown format 

1879 """ 

1880 if self._gpgsig is None: 

1881 return self.as_raw_string(), None, None 

1882 

1883 payload = self.raw_without_sig() 

1884 

1885 # Determine signature type 

1886 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE): 

1887 sig_type = SIGNATURE_PGP 

1888 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE): 

1889 sig_type = SIGNATURE_SSH 

1890 else: 

1891 raise ObjectFormatException("Unknown signature format") 

1892 

1893 return payload, self._gpgsig, sig_type 

1894 

1895 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1896 """Verify GPG signature for this commit (if it is signed). 

1897 

1898 Args: 

1899 keyids: Optional iterable of trusted keyids for this commit. 

1900 If this commit is not signed by any key in keyids verification will 

1901 fail. If not specified, this function only verifies that the commit 

1902 has a valid signature. 

1903 

1904 Raises: 

1905 gpg.errors.BadSignatures: if GPG signature verification fails 

1906 gpg.errors.MissingSignatures: if commit was not signed by a key 

1907 specified in keyids 

1908 """ 

1909 if self._gpgsig is None: 

1910 return 

1911 

1912 import gpg 

1913 

1914 with gpg.Context() as ctx: 

1915 data, result = ctx.verify( 

1916 self.raw_without_sig(), 

1917 signature=self._gpgsig, 

1918 ) 

1919 if keyids: 

1920 keys = [ctx.get_key(key) for key in keyids] 

1921 for key in keys: 

1922 for subkey in key.subkeys: 

1923 for sig in result.signatures: 

1924 if subkey.can_sign and subkey.fpr == sig.fpr: 

1925 return 

1926 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1927 

1928 def _serialize(self) -> list[bytes]: 

1929 headers = [] 

1930 assert self._tree is not None 

1931 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1932 headers.append((_TREE_HEADER, tree_bytes)) 

1933 for p in self._parents: 

1934 headers.append((_PARENT_HEADER, p)) 

1935 assert self._author is not None 

1936 assert self._author_time is not None 

1937 assert self._author_timezone is not None 

1938 assert self._author_timezone_neg_utc is not None 

1939 headers.append( 

1940 ( 

1941 _AUTHOR_HEADER, 

1942 format_time_entry( 

1943 self._author, 

1944 self._author_time, 

1945 (self._author_timezone, self._author_timezone_neg_utc), 

1946 ), 

1947 ) 

1948 ) 

1949 assert self._committer is not None 

1950 assert self._commit_time is not None 

1951 assert self._commit_timezone is not None 

1952 assert self._commit_timezone_neg_utc is not None 

1953 headers.append( 

1954 ( 

1955 _COMMITTER_HEADER, 

1956 format_time_entry( 

1957 self._committer, 

1958 self._commit_time, 

1959 (self._commit_timezone, self._commit_timezone_neg_utc), 

1960 ), 

1961 ) 

1962 ) 

1963 if self.encoding: 

1964 headers.append((_ENCODING_HEADER, self.encoding)) 

1965 for mergetag in self.mergetag: 

1966 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

1967 headers.extend( 

1968 (field, value) for field, value in self._extra if value is not None 

1969 ) 

1970 if self.gpgsig: 

1971 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

1972 return list(_format_message(headers, self._message)) 

1973 

1974 tree = serializable_property("tree", "Tree that is the state of this commit") 

1975 

1976 def _get_parents(self) -> list[bytes]: 

1977 """Return a list of parents of this commit.""" 

1978 return self._parents 

1979 

1980 def _set_parents(self, value: list[bytes]) -> None: 

1981 """Set a list of parents of this commit.""" 

1982 self._needs_serialization = True 

1983 self._parents = value 

1984 

1985 parents = property( 

1986 _get_parents, 

1987 _set_parents, 

1988 doc="Parents of this commit, by their SHA1.", 

1989 ) 

1990 

1991 @replace_me(since="0.21.0", remove_in="0.24.0") 

1992 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]: 

1993 """Return extra settings of this commit.""" 

1994 return self._extra 

1995 

1996 extra = property( 

1997 _get_extra, 

1998 doc="Extra header fields not understood (presumably added in a " 

1999 "newer version of git). Kept verbatim so the object can " 

2000 "be correctly reserialized. For private commit metadata, use " 

2001 "pseudo-headers in Commit.message, rather than this field.", 

2002 ) 

2003 

2004 author = serializable_property("author", "The name of the author of the commit") 

2005 

2006 committer = serializable_property( 

2007 "committer", "The name of the committer of the commit" 

2008 ) 

2009 

2010 message = serializable_property("message", "The commit message") 

2011 

2012 commit_time = serializable_property( 

2013 "commit_time", 

2014 "The timestamp of the commit. As the number of seconds since the epoch.", 

2015 ) 

2016 

2017 commit_timezone = serializable_property( 

2018 "commit_timezone", "The zone the commit time is in" 

2019 ) 

2020 

2021 author_time = serializable_property( 

2022 "author_time", 

2023 "The timestamp the commit was written. As the number of " 

2024 "seconds since the epoch.", 

2025 ) 

2026 

2027 author_timezone = serializable_property( 

2028 "author_timezone", "Returns the zone the author time is in." 

2029 ) 

2030 

2031 encoding = serializable_property("encoding", "Encoding of the commit message.") 

2032 

2033 mergetag = serializable_property("mergetag", "Associated signed tag.") 

2034 

2035 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

2036 

2037 

2038OBJECT_CLASSES = ( 

2039 Commit, 

2040 Tree, 

2041 Blob, 

2042 Tag, 

2043) 

2044 

2045_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {} 

2046 

2047for cls in OBJECT_CLASSES: 

2048 _TYPE_MAP[cls.type_name] = cls 

2049 _TYPE_MAP[cls.type_num] = cls 

2050 

2051 

2052# Hold on to the pure-python implementations for testing 

2053_parse_tree_py = parse_tree 

2054_sorted_tree_items_py = sorted_tree_items 

2055try: 

2056 # Try to import Rust versions 

2057 from dulwich._objects import ( 

2058 parse_tree as _parse_tree_rs, 

2059 ) 

2060 from dulwich._objects import ( 

2061 sorted_tree_items as _sorted_tree_items_rs, 

2062 ) 

2063except ImportError: 

2064 pass 

2065else: 

2066 parse_tree = _parse_tree_rs 

2067 sorted_tree_items = _sorted_tree_items_rs