Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1053 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25__all__ = [ 

26 "BEGIN_PGP_SIGNATURE", 

27 "BEGIN_SSH_SIGNATURE", 

28 "MAX_TIME", 

29 "OBJECT_CLASSES", 

30 "SIGNATURE_PGP", 

31 "SIGNATURE_SSH", 

32 "S_IFGITLINK", 

33 "S_ISGITLINK", 

34 "ZERO_SHA", 

35 "Blob", 

36 "Commit", 

37 "EmptyFileException", 

38 "FixedSha", 

39 "ObjectID", 

40 "RawObjectID", 

41 "ShaFile", 

42 "SubmoduleEncountered", 

43 "Tag", 

44 "Tree", 

45 "TreeEntry", 

46 "check_hexsha", 

47 "check_identity", 

48 "check_time", 

49 "filename_to_hex", 

50 "format_time_entry", 

51 "format_timezone", 

52 "git_line", 

53 "hex_to_filename", 

54 "hex_to_sha", 

55 "is_blob", 

56 "is_commit", 

57 "is_tag", 

58 "is_tree", 

59 "key_entry", 

60 "key_entry_name_order", 

61 "object_class", 

62 "object_header", 

63 "parse_commit_broken", 

64 "parse_tree", 

65 "pretty_format_tree_entry", 

66 "serializable_property", 

67 "serialize_tree", 

68 "sha_to_hex", 

69 "sorted_tree_items", 

70 "valid_hexsha", 

71] 

72 

73import binascii 

74import os 

75import posixpath 

76import re 

77import stat 

78import sys 

79import zlib 

80from collections.abc import Callable, Iterable, Iterator, Sequence 

81from hashlib import sha1 

82from io import BufferedIOBase, BytesIO 

83from typing import ( 

84 IO, 

85 TYPE_CHECKING, 

86 NamedTuple, 

87 TypeVar, 

88) 

89 

90if sys.version_info >= (3, 11): 

91 from typing import Self 

92else: 

93 from typing_extensions import Self 

94 

95from typing import NewType, TypeGuard 

96 

97from .errors import ( 

98 ChecksumMismatch, 

99 FileFormatException, 

100 NotBlobError, 

101 NotCommitError, 

102 NotTagError, 

103 NotTreeError, 

104 ObjectFormatException, 

105) 

106from .file import GitFile 

107from .object_format import DEFAULT_OBJECT_FORMAT, ObjectFormat 

108 

109if TYPE_CHECKING: 

110 from _hashlib import HASH 

111 

112 from .file import _GitFile 

113 

114# Zero SHA constants for backward compatibility - now defined below as ObjectID 

115 

116 

117# Header fields for commits 

118_TREE_HEADER = b"tree" 

119_PARENT_HEADER = b"parent" 

120_AUTHOR_HEADER = b"author" 

121_COMMITTER_HEADER = b"committer" 

122_ENCODING_HEADER = b"encoding" 

123_MERGETAG_HEADER = b"mergetag" 

124_GPGSIG_HEADER = b"gpgsig" 

125 

126# Header fields for objects 

127_OBJECT_HEADER = b"object" 

128_TYPE_HEADER = b"type" 

129_TAG_HEADER = b"tag" 

130_TAGGER_HEADER = b"tagger" 

131 

132 

133S_IFGITLINK = 0o160000 

134 

135# Intentionally flexible regex to support various types of brokenness 

136# in commit/tag author/committer/tagger lines 

137_TIME_ENTRY_RE = re.compile( 

138 b"^(?P<person>.*) (?P<time>-?[0-9]+) (?P<timezone>[+-]{0,2}[0-9]+)$" 

139) 

140 

141 

142MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

143 

144BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

145BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----" 

146 

147# Signature type constants 

148SIGNATURE_PGP = b"pgp" 

149SIGNATURE_SSH = b"ssh" 

150 

151 

152# Hex SHA type 

153ObjectID = NewType("ObjectID", bytes) 

154 

155# Raw SHA type 

156RawObjectID = NewType("RawObjectID", bytes) 

157 

158# Zero SHA constant 

159ZERO_SHA: ObjectID = ObjectID(b"0" * 40) 

160 

161 

162class EmptyFileException(FileFormatException): 

163 """An unexpectedly empty file was encountered.""" 

164 

165 

166def S_ISGITLINK(m: int) -> bool: 

167 """Check if a mode indicates a submodule. 

168 

169 Args: 

170 m: Mode to check 

171 Returns: a ``boolean`` 

172 """ 

173 return stat.S_IFMT(m) == S_IFGITLINK 

174 

175 

176def _decompress(string: bytes) -> bytes: 

177 dcomp = zlib.decompressobj() 

178 dcomped = dcomp.decompress(string) 

179 dcomped += dcomp.flush() 

180 return dcomped 

181 

182 

183def sha_to_hex(sha: RawObjectID) -> ObjectID: 

184 """Takes a string and returns the hex of the sha within.""" 

185 hexsha = binascii.hexlify(sha) 

186 # Support both SHA1 (40 chars) and SHA256 (64 chars) 

187 if len(hexsha) not in (40, 64): 

188 raise ValueError(f"Incorrect length of sha string: {hexsha!r}") 

189 return ObjectID(hexsha) 

190 

191 

192def hex_to_sha(hex: ObjectID | str) -> RawObjectID: 

193 """Takes a hex sha and returns a binary sha.""" 

194 # Support both SHA1 (40 chars) and SHA256 (64 chars) 

195 if len(hex) not in (40, 64): 

196 raise ValueError(f"Incorrect length of hexsha: {hex!r}") 

197 try: 

198 return RawObjectID(binascii.unhexlify(hex)) 

199 except TypeError as exc: 

200 if not isinstance(hex, bytes): 

201 raise 

202 raise ValueError(exc.args[0]) from exc 

203 

204 

205def valid_hexsha(hex: bytes | str) -> bool: 

206 """Check if a hex string is a valid SHA1 or SHA256. 

207 

208 Args: 

209 hex: Hex string to validate 

210 

211 Returns: 

212 True if valid SHA1 (40 chars) or SHA256 (64 chars), False otherwise 

213 """ 

214 if len(hex) not in (40, 64): 

215 return False 

216 try: 

217 binascii.unhexlify(hex) 

218 except (TypeError, binascii.Error): 

219 return False 

220 else: 

221 return True 

222 

223 

224PathT = TypeVar("PathT", str, bytes) 

225 

226 

227def hex_to_filename(path: PathT, hex: str | bytes) -> PathT: 

228 """Takes a hex sha and returns its filename relative to the given path.""" 

229 # os.path.join accepts bytes or unicode, but all args must be of the same 

230 # type. Make sure that hex which is expected to be bytes, is the same type 

231 # as path. 

232 if isinstance(path, str): 

233 if isinstance(hex, bytes): 

234 hex_str = hex.decode("ascii") 

235 else: 

236 hex_str = hex 

237 dir_name = hex_str[:2] 

238 file_name = hex_str[2:] 

239 result = os.path.join(path, dir_name, file_name) 

240 assert isinstance(result, str) 

241 return result 

242 else: 

243 # path is bytes 

244 if isinstance(hex, str): 

245 hex_bytes = hex.encode("ascii") 

246 else: 

247 hex_bytes = hex 

248 dir_name_b = hex_bytes[:2] 

249 file_name_b = hex_bytes[2:] 

250 result_b = os.path.join(path, dir_name_b, file_name_b) 

251 assert isinstance(result_b, bytes) 

252 return result_b 

253 

254 

255def filename_to_hex(filename: str | bytes) -> str: 

256 """Takes an object filename and returns its corresponding hex sha.""" 

257 # grab the last (up to) two path components 

258 errmsg = f"Invalid object filename: {filename!r}" 

259 if isinstance(filename, str): 

260 names = filename.rsplit(os.path.sep, 2)[-2:] 

261 assert len(names) == 2, errmsg 

262 base, rest = names 

263 assert len(base) == 2 and len(rest) == 38, errmsg 

264 hex_str = base + rest 

265 hex_bytes = hex_str.encode("ascii") 

266 else: 

267 # filename is bytes 

268 sep = ( 

269 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep 

270 ) 

271 names_b = filename.rsplit(sep, 2)[-2:] 

272 assert len(names_b) == 2, errmsg 

273 base_b, rest_b = names_b 

274 assert len(base_b) == 2 and len(rest_b) == 38, errmsg 

275 hex_bytes = base_b + rest_b 

276 hex_to_sha(ObjectID(hex_bytes)) 

277 return hex_bytes.decode("ascii") 

278 

279 

280def object_header(num_type: int, length: int) -> bytes: 

281 """Return an object header for the given numeric type and text length.""" 

282 cls = object_class(num_type) 

283 if cls is None: 

284 raise AssertionError(f"unsupported class type num: {num_type}") 

285 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

286 

287 

288def serializable_property(name: str, docstring: str | None = None) -> property: 

289 """A property that helps tracking whether serialization is necessary.""" 

290 

291 def set(obj: "ShaFile", value: object) -> None: 

292 """Set the property value and mark the object as needing serialization. 

293 

294 Args: 

295 obj: The ShaFile object 

296 value: The value to set 

297 """ 

298 setattr(obj, "_" + name, value) 

299 obj._needs_serialization = True 

300 

301 def get(obj: "ShaFile") -> object: 

302 """Get the property value. 

303 

304 Args: 

305 obj: The ShaFile object 

306 

307 Returns: 

308 The property value 

309 """ 

310 return getattr(obj, "_" + name) 

311 

312 return property(get, set, doc=docstring) 

313 

314 

315def object_class(type: bytes | int) -> type["ShaFile"] | None: 

316 """Get the object class corresponding to the given type. 

317 

318 Args: 

319 type: Either a type name string or a numeric type. 

320 Returns: The ShaFile subclass corresponding to the given type, or None if 

321 type is not a valid type name/number. 

322 """ 

323 return _TYPE_MAP.get(type, None) 

324 

325 

326def check_hexsha(hex: str | bytes, error_msg: str) -> None: 

327 """Check if a string is a valid hex sha string. 

328 

329 Args: 

330 hex: Hex string to check 

331 error_msg: Error message to use in exception 

332 Raises: 

333 ObjectFormatException: Raised when the string is not valid 

334 """ 

335 if not valid_hexsha(hex): 

336 raise ObjectFormatException(f"{error_msg} {hex!r}") 

337 

338 

339def check_identity(identity: bytes | None, error_msg: str) -> None: 

340 """Check if the specified identity is valid. 

341 

342 This will raise an exception if the identity is not valid. 

343 

344 Args: 

345 identity: Identity string 

346 error_msg: Error message to use in exception 

347 """ 

348 if identity is None: 

349 raise ObjectFormatException(error_msg) 

350 email_start = identity.find(b"<") 

351 email_end = identity.find(b">") 

352 if not all( 

353 [ 

354 email_start >= 1, 

355 identity[email_start - 1] == b" "[0], 

356 identity.find(b"<", email_start + 1) == -1, 

357 email_end == len(identity) - 1, 

358 b"\0" not in identity, 

359 b"\n" not in identity, 

360 ] 

361 ): 

362 raise ObjectFormatException(error_msg) 

363 

364 

365def _path_to_bytes(path: str | bytes) -> bytes: 

366 """Convert a path to bytes for use in error messages.""" 

367 if isinstance(path, str): 

368 return path.encode("utf-8", "surrogateescape") 

369 return path 

370 

371 

372def check_time(time_seconds: int) -> None: 

373 """Check if the specified time is not prone to overflow error. 

374 

375 This will raise an exception if the time is not valid. 

376 

377 Args: 

378 time_seconds: time in seconds 

379 

380 """ 

381 # Prevent overflow error 

382 if time_seconds > MAX_TIME: 

383 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

384 

385 

386def git_line(*items: bytes) -> bytes: 

387 """Formats items into a space separated line.""" 

388 return b" ".join(items) + b"\n" 

389 

390 

391class FixedSha: 

392 """SHA object that behaves like hashlib's but is given a fixed value.""" 

393 

394 __slots__ = ("_hexsha", "_sha") 

395 

396 def __init__(self, hexsha: str | bytes) -> None: 

397 """Initialize FixedSha with a fixed SHA value. 

398 

399 Args: 

400 hexsha: Hex SHA value as string or bytes 

401 """ 

402 if isinstance(hexsha, str): 

403 hexsha = hexsha.encode("ascii") 

404 if not isinstance(hexsha, bytes): 

405 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

406 self._hexsha = hexsha 

407 self._sha = hex_to_sha(ObjectID(hexsha)) 

408 

409 def digest(self) -> bytes: 

410 """Return the raw SHA digest.""" 

411 return self._sha 

412 

413 def hexdigest(self) -> str: 

414 """Return the hex SHA digest.""" 

415 return self._hexsha.decode("ascii") 

416 

417 

418# Type guard functions for runtime type narrowing 

419if TYPE_CHECKING: 

420 

421 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

422 """Check if a ShaFile is a Commit.""" 

423 return obj.type_name == b"commit" 

424 

425 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

426 """Check if a ShaFile is a Tree.""" 

427 return obj.type_name == b"tree" 

428 

429 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

430 """Check if a ShaFile is a Blob.""" 

431 return obj.type_name == b"blob" 

432 

433 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

434 """Check if a ShaFile is a Tag.""" 

435 return obj.type_name == b"tag" 

436else: 

437 # Runtime versions without type narrowing 

438 def is_commit(obj: "ShaFile") -> bool: 

439 """Check if a ShaFile is a Commit.""" 

440 return obj.type_name == b"commit" 

441 

442 def is_tree(obj: "ShaFile") -> bool: 

443 """Check if a ShaFile is a Tree.""" 

444 return obj.type_name == b"tree" 

445 

446 def is_blob(obj: "ShaFile") -> bool: 

447 """Check if a ShaFile is a Blob.""" 

448 return obj.type_name == b"blob" 

449 

450 def is_tag(obj: "ShaFile") -> bool: 

451 """Check if a ShaFile is a Tag.""" 

452 return obj.type_name == b"tag" 

453 

454 

455class ShaFile: 

456 """A git SHA file.""" 

457 

458 __slots__ = ("_chunked_text", "_needs_serialization", "_sha", "object_format") 

459 

460 _needs_serialization: bool 

461 type_name: bytes 

462 type_num: int 

463 _chunked_text: list[bytes] | None 

464 _sha: "FixedSha | None | HASH" 

465 object_format: ObjectFormat 

466 

467 def __init__(self) -> None: 

468 """Initialize a ShaFile.""" 

469 self._sha = None 

470 self._chunked_text = None 

471 self._needs_serialization = True 

472 self.object_format = DEFAULT_OBJECT_FORMAT 

473 

474 @staticmethod 

475 def _parse_legacy_object_header( 

476 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile" 

477 ) -> "ShaFile": 

478 """Parse a legacy object, creating it but not reading the file.""" 

479 bufsize = 1024 

480 decomp = zlib.decompressobj() 

481 header = decomp.decompress(magic) 

482 start = 0 

483 end = -1 

484 while end < 0: 

485 extra = f.read(bufsize) 

486 header += decomp.decompress(extra) 

487 magic += extra 

488 end = header.find(b"\0", start) 

489 start = len(header) 

490 header = header[:end] 

491 type_name, size = header.split(b" ", 1) 

492 try: 

493 int(size) # sanity check 

494 except ValueError as exc: 

495 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

496 obj_class = object_class(type_name) 

497 if not obj_class: 

498 raise ObjectFormatException( 

499 "Not a known type: {}".format(type_name.decode("ascii")) 

500 ) 

501 return obj_class() 

502 

503 def _parse_legacy_object(self, map: bytes) -> None: 

504 """Parse a legacy object, setting the raw string.""" 

505 text = _decompress(map) 

506 header_end = text.find(b"\0") 

507 if header_end < 0: 

508 raise ObjectFormatException("Invalid object header, no \\0") 

509 self.set_raw_string(text[header_end + 1 :]) 

510 

511 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

512 """Return chunks representing the object in the experimental format. 

513 

514 Returns: List of strings 

515 """ 

516 compobj = zlib.compressobj(compression_level) 

517 yield compobj.compress(self._header()) 

518 for chunk in self.as_raw_chunks(): 

519 yield compobj.compress(chunk) 

520 yield compobj.flush() 

521 

522 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

523 """Return string representing the object in the experimental format.""" 

524 return b"".join( 

525 self.as_legacy_object_chunks(compression_level=compression_level) 

526 ) 

527 

528 def as_raw_chunks(self) -> list[bytes]: 

529 """Return chunks with serialization of the object. 

530 

531 Returns: List of strings, not necessarily one per line 

532 """ 

533 if self._needs_serialization: 

534 self._sha = None 

535 self._chunked_text = self._serialize() 

536 self._needs_serialization = False 

537 assert self._chunked_text is not None 

538 return self._chunked_text 

539 

540 def as_raw_string(self) -> bytes: 

541 """Return raw string with serialization of the object. 

542 

543 Returns: String object 

544 """ 

545 return b"".join(self.as_raw_chunks()) 

546 

547 def __bytes__(self) -> bytes: 

548 """Return raw string serialization of this object.""" 

549 return self.as_raw_string() 

550 

551 def __hash__(self) -> int: 

552 """Return unique hash for this object.""" 

553 return hash(self.id) 

554 

555 def as_pretty_string(self) -> str: 

556 """Return a string representing this object, fit for display.""" 

557 return self.as_raw_string().decode("utf-8", "replace") 

558 

559 def set_raw_string( 

560 self, text: bytes, sha: ObjectID | RawObjectID | None = None 

561 ) -> None: 

562 """Set the contents of this object from a serialized string.""" 

563 if not isinstance(text, bytes): 

564 raise TypeError(f"Expected bytes for text, got {text!r}") 

565 self.set_raw_chunks([text], sha) 

566 

567 def set_raw_chunks( 

568 self, 

569 chunks: list[bytes], 

570 sha: ObjectID | RawObjectID | None = None, 

571 *, 

572 object_format: ObjectFormat | None = None, 

573 ) -> None: 

574 """Set the contents of this object from a list of chunks.""" 

575 self._chunked_text = chunks 

576 # Set hash algorithm if provided 

577 if object_format is not None: 

578 self.object_format = object_format 

579 # Set SHA before deserialization so Tree can use hash algorithm 

580 if sha is None: 

581 self._sha = None 

582 else: 

583 self._sha = FixedSha(sha) 

584 self._deserialize(chunks) 

585 self._needs_serialization = False 

586 

587 @staticmethod 

588 def _parse_object_header( 

589 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile" 

590 ) -> "ShaFile": 

591 """Parse a new style object, creating it but not reading the file.""" 

592 num_type = (ord(magic[0:1]) >> 4) & 7 

593 obj_class = object_class(num_type) 

594 if not obj_class: 

595 raise ObjectFormatException(f"Not a known type {num_type}") 

596 return obj_class() 

597 

598 def _parse_object(self, map: bytes) -> None: 

599 """Parse a new style object, setting self._text.""" 

600 # skip type and size; type must have already been determined, and 

601 # we trust zlib to fail if it's otherwise corrupted 

602 byte = ord(map[0:1]) 

603 used = 1 

604 while (byte & 0x80) != 0: 

605 byte = ord(map[used : used + 1]) 

606 used += 1 

607 raw = map[used:] 

608 self.set_raw_string(_decompress(raw)) 

609 

610 @classmethod 

611 def _is_legacy_object(cls, magic: bytes) -> bool: 

612 b0 = ord(magic[0:1]) 

613 b1 = ord(magic[1:2]) 

614 word = (b0 << 8) + b1 

615 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

616 

617 @classmethod 

618 def _parse_file( 

619 cls, 

620 f: BufferedIOBase | IO[bytes] | "_GitFile", 

621 *, 

622 object_format: ObjectFormat | None = None, 

623 ) -> "ShaFile": 

624 map = f.read() 

625 if not map: 

626 raise EmptyFileException("Corrupted empty file detected") 

627 

628 if cls._is_legacy_object(map): 

629 obj = cls._parse_legacy_object_header(map, f) 

630 if object_format is not None: 

631 obj.object_format = object_format 

632 obj._parse_legacy_object(map) 

633 else: 

634 obj = cls._parse_object_header(map, f) 

635 if object_format is not None: 

636 obj.object_format = object_format 

637 obj._parse_object(map) 

638 return obj 

639 

640 def _deserialize(self, chunks: list[bytes]) -> None: 

641 raise NotImplementedError(self._deserialize) 

642 

643 def _serialize(self) -> list[bytes]: 

644 raise NotImplementedError(self._serialize) 

645 

646 @classmethod 

647 def from_path( 

648 cls, 

649 path: str | bytes, 

650 sha: ObjectID | None = None, 

651 *, 

652 object_format: ObjectFormat | None = None, 

653 ) -> "ShaFile": 

654 """Open a SHA file from disk.""" 

655 with GitFile(path, "rb") as f: 

656 return cls.from_file(f, sha, object_format=object_format) 

657 

658 @classmethod 

659 def from_file( 

660 cls, 

661 f: BufferedIOBase | IO[bytes] | "_GitFile", 

662 sha: ObjectID | None = None, 

663 *, 

664 object_format: ObjectFormat | None = None, 

665 ) -> "ShaFile": 

666 """Get the contents of a SHA file on disk.""" 

667 try: 

668 # Validate SHA length matches hash algorithm if both provided 

669 if sha is not None and object_format is not None: 

670 expected_len = object_format.hex_length 

671 if len(sha) != expected_len: 

672 raise ValueError( 

673 f"SHA length {len(sha)} doesn't match hash algorithm " 

674 f"{object_format.name} (expected {expected_len})" 

675 ) 

676 

677 obj = cls._parse_file(f, object_format=object_format) 

678 if sha is not None: 

679 obj._sha = FixedSha(sha) 

680 else: 

681 obj._sha = None 

682 return obj 

683 except (IndexError, ValueError) as exc: 

684 raise ObjectFormatException("invalid object header") from exc 

685 

686 @staticmethod 

687 def from_raw_string( 

688 type_num: int, 

689 string: bytes, 

690 sha: ObjectID | RawObjectID | None = None, 

691 *, 

692 object_format: ObjectFormat | None = None, 

693 ) -> "ShaFile": 

694 """Creates an object of the indicated type from the raw string given. 

695 

696 Args: 

697 type_num: The numeric type of the object. 

698 string: The raw uncompressed contents. 

699 sha: Optional known sha for the object 

700 object_format: Optional hash algorithm for the object 

701 """ 

702 cls = object_class(type_num) 

703 if cls is None: 

704 raise AssertionError(f"unsupported class type num: {type_num}") 

705 obj = cls() 

706 if object_format is not None: 

707 obj.object_format = object_format 

708 obj.set_raw_string(string, sha) 

709 return obj 

710 

711 @staticmethod 

712 def from_raw_chunks( 

713 type_num: int, chunks: list[bytes], sha: ObjectID | RawObjectID | None = None 

714 ) -> "ShaFile": 

715 """Creates an object of the indicated type from the raw chunks given. 

716 

717 Args: 

718 type_num: The numeric type of the object. 

719 chunks: An iterable of the raw uncompressed contents. 

720 sha: Optional known sha for the object 

721 """ 

722 cls = object_class(type_num) 

723 if cls is None: 

724 raise AssertionError(f"unsupported class type num: {type_num}") 

725 obj = cls() 

726 obj.set_raw_chunks(chunks, sha) 

727 return obj 

728 

729 @classmethod 

730 def from_string(cls, string: bytes) -> Self: 

731 """Create a ShaFile from a string.""" 

732 obj = cls() 

733 obj.set_raw_string(string) 

734 return obj 

735 

736 def _check_has_member(self, member: str, error_msg: str) -> None: 

737 """Check that the object has a given member variable. 

738 

739 Args: 

740 member: the member variable to check for 

741 error_msg: the message for an error if the member is missing 

742 Raises: 

743 ObjectFormatException: with the given error_msg if member is 

744 missing or is None 

745 """ 

746 if getattr(self, member, None) is None: 

747 raise ObjectFormatException(error_msg) 

748 

749 def check(self) -> None: 

750 """Check this object for internal consistency. 

751 

752 Raises: 

753 ObjectFormatException: if the object is malformed in some way 

754 ChecksumMismatch: if the object was created with a SHA that does 

755 not match its contents 

756 """ 

757 # TODO: if we find that error-checking during object parsing is a 

758 # performance bottleneck, those checks should be moved to the class's 

759 # check() method during optimization so we can still check the object 

760 # when necessary. 

761 old_sha = self.id 

762 try: 

763 self._deserialize(self.as_raw_chunks()) 

764 self._sha = None 

765 new_sha = self.id 

766 except Exception as exc: 

767 raise ObjectFormatException(exc) from exc 

768 if old_sha != new_sha: 

769 raise ChecksumMismatch(new_sha, old_sha) 

770 

771 def _header(self) -> bytes: 

772 return object_header(self.type_num, self.raw_length()) 

773 

774 def raw_length(self) -> int: 

775 """Returns the length of the raw string of this object.""" 

776 return sum(map(len, self.as_raw_chunks())) 

777 

778 def sha(self, object_format: ObjectFormat | None = None) -> "FixedSha | HASH": 

779 """The SHA object that is the name of this object. 

780 

781 Args: 

782 object_format: Optional HashAlgorithm to use. Defaults to SHA1. 

783 """ 

784 # If using a different hash algorithm, always recalculate 

785 if object_format is not None: 

786 new_sha = object_format.new_hash() 

787 new_sha.update(self._header()) 

788 for chunk in self.as_raw_chunks(): 

789 new_sha.update(chunk) 

790 return new_sha 

791 

792 # Otherwise use cached SHA1 value 

793 if self._sha is None or self._needs_serialization: 

794 # this is a local because as_raw_chunks() overwrites self._sha 

795 new_sha = sha1() 

796 new_sha.update(self._header()) 

797 for chunk in self.as_raw_chunks(): 

798 new_sha.update(chunk) 

799 self._sha = new_sha 

800 return self._sha 

801 

802 def copy(self) -> "ShaFile": 

803 """Create a new copy of this SHA1 object from its raw string.""" 

804 obj_class = object_class(self.type_num) 

805 if obj_class is None: 

806 raise AssertionError(f"invalid type num {self.type_num}") 

807 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

808 

809 @property 

810 def id(self) -> ObjectID: 

811 """The hex SHA1 of this object. 

812 

813 For SHA256 repositories, use get_id(object_format) instead. 

814 This property always returns SHA1 for backward compatibility. 

815 """ 

816 return ObjectID(self.sha().hexdigest().encode("ascii")) 

817 

818 def get_id(self, object_format: ObjectFormat | None = None) -> bytes: 

819 """Get the hex SHA of this object using the specified hash algorithm. 

820 

821 Args: 

822 object_format: Optional HashAlgorithm to use. Defaults to SHA1. 

823 

824 Example: 

825 >>> blob = Blob() 

826 >>> blob.data = b"Hello, World!" 

827 >>> blob.id # Always returns SHA1 for backward compatibility 

828 b'4ab299c8ad6ed14f31923dd94f8b5f5cb89dfb54' 

829 >>> blob.get_id() # Same as .id 

830 b'4ab299c8ad6ed14f31923dd94f8b5f5cb89dfb54' 

831 >>> from dulwich.object_format import SHA256 

832 >>> blob.get_id(SHA256) # Get SHA256 hash 

833 b'03ba204e2f2e707...' # 64-character SHA256 

834 """ 

835 return self.sha(object_format).hexdigest().encode("ascii") 

836 

837 def __repr__(self) -> str: 

838 """Return string representation of this object.""" 

839 return f"<{self.__class__.__name__} {self.id!r}>" 

840 

841 def __ne__(self, other: object) -> bool: 

842 """Check whether this object does not match the other.""" 

843 return not isinstance(other, ShaFile) or self.id != other.id 

844 

845 def __eq__(self, other: object) -> bool: 

846 """Return True if the SHAs of the two objects match.""" 

847 return isinstance(other, ShaFile) and self.id == other.id 

848 

849 def __lt__(self, other: object) -> bool: 

850 """Return whether SHA of this object is less than the other.""" 

851 if not isinstance(other, ShaFile): 

852 raise TypeError 

853 return self.id < other.id 

854 

855 def __le__(self, other: object) -> bool: 

856 """Check whether SHA of this object is less than or equal to the other.""" 

857 if not isinstance(other, ShaFile): 

858 raise TypeError 

859 return self.id <= other.id 

860 

861 

862class Blob(ShaFile): 

863 """A Git Blob object.""" 

864 

865 __slots__ = () 

866 

867 type_name = b"blob" 

868 type_num = 3 

869 

870 _chunked_text: list[bytes] 

871 

872 def __init__(self) -> None: 

873 """Initialize a new Blob object.""" 

874 super().__init__() 

875 self._chunked_text = [] 

876 self._needs_serialization = False 

877 

878 def _get_data(self) -> bytes: 

879 return self.as_raw_string() 

880 

881 def _set_data(self, data: bytes) -> None: 

882 self.set_raw_string(data) 

883 

884 data = property( 

885 _get_data, _set_data, doc="The text contained within the blob object." 

886 ) 

887 

888 def _get_chunked(self) -> list[bytes]: 

889 return self._chunked_text 

890 

891 def _set_chunked(self, chunks: list[bytes]) -> None: 

892 self._chunked_text = chunks 

893 

894 def _serialize(self) -> list[bytes]: 

895 return self._chunked_text 

896 

897 def _deserialize(self, chunks: list[bytes]) -> None: 

898 self._chunked_text = chunks 

899 

900 chunked = property( 

901 _get_chunked, 

902 _set_chunked, 

903 doc="The text in the blob object, as chunks (not necessarily lines)", 

904 ) 

905 

906 @classmethod 

907 def from_path( 

908 cls, 

909 path: str | bytes, 

910 sha: ObjectID | None = None, 

911 *, 

912 object_format: ObjectFormat | None = None, 

913 ) -> "Blob": 

914 """Read a blob from a file on disk. 

915 

916 Args: 

917 path: Path to the blob file 

918 sha: Optional known SHA for the object 

919 object_format: Optional object format to use 

920 

921 Returns: 

922 A Blob object 

923 

924 Raises: 

925 NotBlobError: If the file is not a blob 

926 """ 

927 blob = ShaFile.from_path(path, sha, object_format=object_format) 

928 if not isinstance(blob, cls): 

929 raise NotBlobError(_path_to_bytes(path)) 

930 return blob 

931 

932 def check(self) -> None: 

933 """Check this object for internal consistency. 

934 

935 Raises: 

936 ObjectFormatException: if the object is malformed in some way 

937 """ 

938 super().check() 

939 

940 def splitlines(self) -> list[bytes]: 

941 """Return list of lines in this blob. 

942 

943 This preserves the original line endings. 

944 """ 

945 chunks = self.chunked 

946 if not chunks: 

947 return [] 

948 if len(chunks) == 1: 

949 result: list[bytes] = chunks[0].splitlines(True) 

950 return result 

951 remaining = None 

952 ret = [] 

953 for chunk in chunks: 

954 lines = chunk.splitlines(True) 

955 if len(lines) > 1: 

956 ret.append((remaining or b"") + lines[0]) 

957 ret.extend(lines[1:-1]) 

958 remaining = lines[-1] 

959 elif len(lines) == 1: 

960 if remaining is None: 

961 remaining = lines.pop() 

962 else: 

963 remaining += lines.pop() 

964 if remaining is not None: 

965 ret.append(remaining) 

966 return ret 

967 

968 

969def _parse_message( 

970 chunks: Iterable[bytes], 

971) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]: 

972 """Parse a message with a list of fields and a body. 

973 

974 Args: 

975 chunks: the raw chunks of the tag or commit object. 

976 Returns: iterator of tuples of (field, value), one per header line, in the 

977 order read from the text, possibly including duplicates. Includes a 

978 field named None for the freeform tag/commit text. 

979 """ 

980 f = BytesIO(b"".join(chunks)) 

981 k = None 

982 v = b"" 

983 eof = False 

984 

985 def _strip_last_newline(value: bytes) -> bytes: 

986 """Strip the last newline from value.""" 

987 if value and value.endswith(b"\n"): 

988 return value[:-1] 

989 return value 

990 

991 # Parse the headers 

992 # 

993 # Headers can contain newlines. The next line is indented with a space. 

994 # We store the latest key as 'k', and the accumulated value as 'v'. 

995 for line in f: 

996 if line.startswith(b" "): 

997 # Indented continuation of the previous line 

998 v += line[1:] 

999 else: 

1000 if k is not None: 

1001 # We parsed a new header, return its value 

1002 yield (k, _strip_last_newline(v)) 

1003 if line == b"\n": 

1004 # Empty line indicates end of headers 

1005 break 

1006 (k, v) = line.split(b" ", 1) 

1007 

1008 else: 

1009 # We reached end of file before the headers ended. We still need to 

1010 # return the previous header, then we need to return a None field for 

1011 # the text. 

1012 eof = True 

1013 if k is not None: 

1014 yield (k, _strip_last_newline(v)) 

1015 yield (None, None) 

1016 

1017 if not eof: 

1018 # We didn't reach the end of file while parsing headers. We can return 

1019 # the rest of the file as a message. 

1020 yield (None, f.read()) 

1021 

1022 f.close() 

1023 

1024 

1025def _format_message( 

1026 headers: Sequence[tuple[bytes, bytes]], body: bytes | None 

1027) -> Iterator[bytes]: 

1028 for field, value in headers: 

1029 lines = value.split(b"\n") 

1030 yield git_line(field, lines[0]) 

1031 for line in lines[1:]: 

1032 yield b" " + line + b"\n" 

1033 yield b"\n" # There must be a new line after the headers 

1034 if body: 

1035 yield body 

1036 

1037 

1038class Tag(ShaFile): 

1039 """A Git Tag object.""" 

1040 

1041 type_name = b"tag" 

1042 type_num = 4 

1043 

1044 __slots__ = ( 

1045 "_message", 

1046 "_name", 

1047 "_object_class", 

1048 "_object_sha", 

1049 "_signature", 

1050 "_tag_time", 

1051 "_tag_timezone", 

1052 "_tag_timezone_neg_utc", 

1053 "_tagger", 

1054 ) 

1055 

1056 _message: bytes | None 

1057 _name: bytes | None 

1058 _object_class: "type[ShaFile] | None" 

1059 _object_sha: bytes | None 

1060 _signature: bytes | None 

1061 _tag_time: int | None 

1062 _tag_timezone: int | None 

1063 _tag_timezone_neg_utc: bool | None 

1064 _tagger: bytes | None 

1065 

1066 def __init__(self) -> None: 

1067 """Initialize a new Tag object.""" 

1068 super().__init__() 

1069 self._tagger = None 

1070 self._tag_time = None 

1071 self._tag_timezone = None 

1072 self._tag_timezone_neg_utc = False 

1073 self._signature: bytes | None = None 

1074 

1075 @classmethod 

1076 def from_path( 

1077 cls, 

1078 filename: str | bytes, 

1079 sha: ObjectID | None = None, 

1080 *, 

1081 object_format: ObjectFormat | None = None, 

1082 ) -> "Tag": 

1083 """Read a tag from a file on disk. 

1084 

1085 Args: 

1086 filename: Path to the tag file 

1087 sha: Optional known SHA for the object 

1088 object_format: Optional object format to use 

1089 

1090 Returns: 

1091 A Tag object 

1092 

1093 Raises: 

1094 NotTagError: If the file is not a tag 

1095 """ 

1096 tag = ShaFile.from_path(filename, sha, object_format=object_format) 

1097 if not isinstance(tag, cls): 

1098 raise NotTagError(_path_to_bytes(filename)) 

1099 return tag 

1100 

1101 def check(self) -> None: 

1102 """Check this object for internal consistency. 

1103 

1104 Raises: 

1105 ObjectFormatException: if the object is malformed in some way 

1106 """ 

1107 super().check() 

1108 assert self._chunked_text is not None 

1109 self._check_has_member("_object_sha", "missing object sha") 

1110 self._check_has_member("_object_class", "missing object type") 

1111 self._check_has_member("_name", "missing tag name") 

1112 

1113 if not self._name: 

1114 raise ObjectFormatException("empty tag name") 

1115 

1116 if self._object_sha is None: 

1117 raise ObjectFormatException("missing object sha") 

1118 check_hexsha(self._object_sha, "invalid object sha") 

1119 

1120 if self._tagger is not None: 

1121 check_identity(self._tagger, "invalid tagger") 

1122 

1123 self._check_has_member("_tag_time", "missing tag time") 

1124 if self._tag_time is None: 

1125 raise ObjectFormatException("missing tag time") 

1126 check_time(self._tag_time) 

1127 

1128 last = None 

1129 for field, _ in _parse_message(self._chunked_text): 

1130 if field == _OBJECT_HEADER and last is not None: 

1131 raise ObjectFormatException("unexpected object") 

1132 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

1133 raise ObjectFormatException("unexpected type") 

1134 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

1135 raise ObjectFormatException("unexpected tag name") 

1136 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

1137 raise ObjectFormatException("unexpected tagger") 

1138 last = field 

1139 

1140 def _serialize(self) -> list[bytes]: 

1141 headers = [] 

1142 if self._object_sha is None: 

1143 raise ObjectFormatException("missing object sha") 

1144 headers.append((_OBJECT_HEADER, self._object_sha)) 

1145 if self._object_class is None: 

1146 raise ObjectFormatException("missing object class") 

1147 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

1148 if self._name is None: 

1149 raise ObjectFormatException("missing tag name") 

1150 headers.append((_TAG_HEADER, self._name)) 

1151 if self._tagger: 

1152 if self._tag_time is None: 

1153 headers.append((_TAGGER_HEADER, self._tagger)) 

1154 else: 

1155 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

1156 raise ObjectFormatException("missing timezone info") 

1157 headers.append( 

1158 ( 

1159 _TAGGER_HEADER, 

1160 format_time_entry( 

1161 self._tagger, 

1162 self._tag_time, 

1163 (self._tag_timezone, self._tag_timezone_neg_utc), 

1164 ), 

1165 ) 

1166 ) 

1167 

1168 if self.message is None and self._signature is None: 

1169 body = None 

1170 else: 

1171 body = (self.message or b"") + (self._signature or b"") 

1172 return list(_format_message(headers, body)) 

1173 

1174 def _deserialize(self, chunks: list[bytes]) -> None: 

1175 """Grab the metadata attached to the tag.""" 

1176 self._tagger = None 

1177 self._tag_time = None 

1178 self._tag_timezone = None 

1179 self._tag_timezone_neg_utc = False 

1180 for field, value in _parse_message(chunks): 

1181 if field == _OBJECT_HEADER: 

1182 self._object_sha = value 

1183 elif field == _TYPE_HEADER: 

1184 assert isinstance(value, bytes) 

1185 obj_class = object_class(value) 

1186 if not obj_class: 

1187 raise ObjectFormatException(f"Not a known type: {value!r}") 

1188 self._object_class = obj_class 

1189 elif field == _TAG_HEADER: 

1190 self._name = value 

1191 elif field == _TAGGER_HEADER: 

1192 if value is None: 

1193 raise ObjectFormatException("missing tagger value") 

1194 ( 

1195 self._tagger, 

1196 self._tag_time, 

1197 (self._tag_timezone, self._tag_timezone_neg_utc), 

1198 ) = parse_time_entry(value) 

1199 elif field is None: 

1200 if value is None: 

1201 self._message = None 

1202 self._signature = None 

1203 else: 

1204 # Try to find either PGP or SSH signature 

1205 sig_idx = None 

1206 try: 

1207 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

1208 except ValueError: 

1209 try: 

1210 sig_idx = value.index(BEGIN_SSH_SIGNATURE) 

1211 except ValueError: 

1212 pass 

1213 

1214 if sig_idx is not None: 

1215 self._message = value[:sig_idx] 

1216 self._signature = value[sig_idx:] 

1217 else: 

1218 self._message = value 

1219 self._signature = None 

1220 else: 

1221 raise ObjectFormatException( 

1222 f"Unknown field {field.decode('ascii', 'replace')}" 

1223 ) 

1224 

1225 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

1226 """Get the object pointed to by this tag. 

1227 

1228 Returns: tuple of (object class, sha). 

1229 """ 

1230 if self._object_class is None or self._object_sha is None: 

1231 raise ValueError("Tag object is not properly initialized") 

1232 return (self._object_class, self._object_sha) 

1233 

1234 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

1235 (self._object_class, self._object_sha) = value 

1236 self._needs_serialization = True 

1237 

1238 object = property(_get_object, _set_object) 

1239 

1240 name = serializable_property("name", "The name of this tag") 

1241 tagger = serializable_property( 

1242 "tagger", "Returns the name of the person who created this tag" 

1243 ) 

1244 tag_time = serializable_property( 

1245 "tag_time", 

1246 "The creation timestamp of the tag. As the number of seconds since the epoch", 

1247 ) 

1248 tag_timezone = serializable_property( 

1249 "tag_timezone", "The timezone that tag_time is in." 

1250 ) 

1251 message = serializable_property("message", "the message attached to this tag") 

1252 

1253 signature = serializable_property("signature", "Optional detached GPG signature") 

1254 

1255 def raw_without_sig(self) -> bytes: 

1256 """Return raw string serialization without the GPG/SSH signature. 

1257 

1258 self.signature is a signature for the returned raw byte string serialization. 

1259 """ 

1260 ret = self.as_raw_string() 

1261 if self._signature: 

1262 ret = ret[: -len(self._signature)] 

1263 return ret 

1264 

1265 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

1266 """Extract the payload, signature, and signature type from this tag. 

1267 

1268 Returns: 

1269 tuple of (``payload``, ``signature``, ``signature_type``) where: 

1270 

1271 - ``payload``: The raw tag data without the signature 

1272 - ``signature``: The signature bytes if present, None otherwise 

1273 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1274 

1275 Raises: 

1276 ObjectFormatException: If signature has unknown format 

1277 """ 

1278 if self._signature is None: 

1279 return self.as_raw_string(), None, None 

1280 

1281 payload = self.raw_without_sig() 

1282 

1283 # Determine signature type 

1284 if self._signature.startswith(BEGIN_PGP_SIGNATURE): 

1285 sig_type = SIGNATURE_PGP 

1286 elif self._signature.startswith(BEGIN_SSH_SIGNATURE): 

1287 sig_type = SIGNATURE_SSH 

1288 else: 

1289 raise ObjectFormatException("Unknown signature format") 

1290 

1291 return payload, self._signature, sig_type 

1292 

1293 

1294class TreeEntry(NamedTuple): 

1295 """Named tuple encapsulating a single tree entry.""" 

1296 

1297 path: bytes 

1298 mode: int 

1299 sha: ObjectID 

1300 

1301 def in_path(self, path: bytes) -> "TreeEntry": 

1302 """Return a copy of this entry with the given path prepended.""" 

1303 if not isinstance(self.path, bytes): 

1304 raise TypeError(f"Expected bytes for path, got {path!r}") 

1305 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1306 

1307 

1308def parse_tree( 

1309 text: bytes, sha_len: int | None = None, *, strict: bool = False 

1310) -> Iterator[tuple[bytes, int, bytes]]: 

1311 """Parse a tree text. 

1312 

1313 Args: 

1314 text: Serialized text to parse 

1315 sha_len: Length of the object IDs in bytes 

1316 strict: Whether to be strict about format 

1317 Returns: iterator of tuples of (name, mode, sha) 

1318 

1319 Raises: 

1320 ObjectFormatException: if the object was malformed in some way 

1321 """ 

1322 count = 0 

1323 length = len(text) 

1324 

1325 while count < length: 

1326 mode_end = text.index(b" ", count) 

1327 mode_text = text[count:mode_end] 

1328 if strict and mode_text.startswith(b"0"): 

1329 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1330 try: 

1331 mode = int(mode_text, 8) 

1332 except ValueError as exc: 

1333 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1334 name_end = text.index(b"\0", mode_end) 

1335 name = text[mode_end + 1 : name_end] 

1336 

1337 if sha_len is None: 

1338 raise ObjectFormatException("sha_len must be specified") 

1339 count = name_end + 1 + sha_len 

1340 if count > length: 

1341 raise ObjectFormatException( 

1342 f"Tree entry extends beyond tree length: {count} > {length}" 

1343 ) 

1344 

1345 sha = text[name_end + 1 : count] 

1346 if len(sha) != sha_len: 

1347 raise ObjectFormatException( 

1348 f"Sha has invalid length: {len(sha)} != {sha_len}" 

1349 ) 

1350 hexsha = sha_to_hex(RawObjectID(sha)) 

1351 yield (name, mode, hexsha) 

1352 

1353 

1354def serialize_tree(items: Iterable[tuple[bytes, int, ObjectID]]) -> Iterator[bytes]: 

1355 """Serialize the items in a tree to a text. 

1356 

1357 Args: 

1358 items: Sorted iterable over (name, mode, sha) tuples 

1359 Returns: Serialized tree text as chunks 

1360 """ 

1361 for name, mode, hexsha in items: 

1362 yield ( 

1363 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1364 ) 

1365 

1366 

1367def sorted_tree_items( 

1368 entries: dict[bytes, tuple[int, ObjectID]], name_order: bool 

1369) -> Iterator[TreeEntry]: 

1370 """Iterate over a tree entries dictionary. 

1371 

1372 Args: 

1373 name_order: If True, iterate entries in order of their name. If 

1374 False, iterate entries in tree order, that is, treat subtree entries as 

1375 having '/' appended. 

1376 entries: Dictionary mapping names to (mode, sha) tuples 

1377 Returns: Iterator over (name, mode, hexsha) 

1378 """ 

1379 if name_order: 

1380 key_func = key_entry_name_order 

1381 else: 

1382 key_func = key_entry 

1383 for name, entry in sorted(entries.items(), key=key_func): 

1384 mode, hexsha = entry 

1385 # Stricter type checks than normal to mirror checks in the Rust version. 

1386 mode = int(mode) 

1387 if not isinstance(hexsha, bytes): 

1388 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1389 yield TreeEntry(name, mode, hexsha) 

1390 

1391 

1392def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1393 """Sort key for tree entry. 

1394 

1395 Args: 

1396 entry: (name, value) tuple 

1397 """ 

1398 (name, (mode, _sha)) = entry 

1399 if stat.S_ISDIR(mode): 

1400 name += b"/" 

1401 return name 

1402 

1403 

1404def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1405 """Sort key for tree entry in name order.""" 

1406 return entry[0] 

1407 

1408 

1409def pretty_format_tree_entry( 

1410 name: bytes, mode: int, hexsha: ObjectID, encoding: str = "utf-8" 

1411) -> str: 

1412 """Pretty format tree entry. 

1413 

1414 Args: 

1415 name: Name of the directory entry 

1416 mode: Mode of entry 

1417 hexsha: Hexsha of the referenced object 

1418 encoding: Character encoding for the name 

1419 Returns: string describing the tree entry 

1420 """ 

1421 if mode & stat.S_IFDIR: 

1422 kind = "tree" 

1423 else: 

1424 kind = "blob" 

1425 return "{:04o} {} {}\t{}\n".format( 

1426 mode, 

1427 kind, 

1428 hexsha.decode("ascii"), 

1429 name.decode(encoding, "replace"), 

1430 ) 

1431 

1432 

1433class SubmoduleEncountered(Exception): 

1434 """A submodule was encountered while resolving a path.""" 

1435 

1436 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1437 """Initialize SubmoduleEncountered exception. 

1438 

1439 Args: 

1440 path: Path where the submodule was encountered 

1441 sha: SHA of the submodule 

1442 """ 

1443 self.path = path 

1444 self.sha = sha 

1445 

1446 

1447class Tree(ShaFile): 

1448 """A Git tree object.""" 

1449 

1450 type_name = b"tree" 

1451 type_num = 2 

1452 

1453 __slots__ = "_entries" 

1454 

1455 def __init__(self) -> None: 

1456 """Initialize an empty Tree.""" 

1457 super().__init__() 

1458 self._entries: dict[bytes, tuple[int, ObjectID]] = {} 

1459 

1460 @classmethod 

1461 def from_path( 

1462 cls, 

1463 filename: str | bytes, 

1464 sha: ObjectID | None = None, 

1465 *, 

1466 object_format: ObjectFormat | None = None, 

1467 ) -> "Tree": 

1468 """Read a tree from a file on disk. 

1469 

1470 Args: 

1471 filename: Path to the tree file 

1472 sha: Optional known SHA for the object 

1473 object_format: Optional object format to use 

1474 

1475 Returns: 

1476 A Tree object 

1477 

1478 Raises: 

1479 NotTreeError: If the file is not a tree 

1480 """ 

1481 tree = ShaFile.from_path(filename, sha, object_format=object_format) 

1482 if not isinstance(tree, cls): 

1483 raise NotTreeError(_path_to_bytes(filename)) 

1484 return tree 

1485 

1486 def __contains__(self, name: bytes) -> bool: 

1487 """Check if name exists in tree.""" 

1488 return name in self._entries 

1489 

1490 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1491 """Get tree entry by name.""" 

1492 return self._entries[name] 

1493 

1494 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1495 """Set a tree entry by name. 

1496 

1497 Args: 

1498 name: The name of the entry, as a string. 

1499 value: A tuple of (mode, hexsha), where mode is the mode of the 

1500 entry as an integral type and hexsha is the hex SHA of the entry as 

1501 a string. 

1502 """ 

1503 mode, hexsha = value 

1504 self._entries[name] = (mode, hexsha) 

1505 self._needs_serialization = True 

1506 

1507 def __delitem__(self, name: bytes) -> None: 

1508 """Delete tree entry by name.""" 

1509 del self._entries[name] 

1510 self._needs_serialization = True 

1511 

1512 def __len__(self) -> int: 

1513 """Return number of entries in tree.""" 

1514 return len(self._entries) 

1515 

1516 def __iter__(self) -> Iterator[bytes]: 

1517 """Iterate over tree entry names.""" 

1518 return iter(self._entries) 

1519 

1520 def add(self, name: bytes, mode: int, hexsha: ObjectID) -> None: 

1521 """Add an entry to the tree. 

1522 

1523 Args: 

1524 mode: The mode of the entry as an integral type. Not all 

1525 possible modes are supported by git; see check() for details. 

1526 name: The name of the entry, as a string. 

1527 hexsha: The hex SHA of the entry as a string. 

1528 """ 

1529 self._entries[name] = mode, hexsha 

1530 self._needs_serialization = True 

1531 

1532 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1533 """Iterate over entries. 

1534 

1535 Args: 

1536 name_order: If True, iterate in name order instead of tree 

1537 order. 

1538 Returns: Iterator over (name, mode, sha) tuples 

1539 """ 

1540 return sorted_tree_items(self._entries, name_order) 

1541 

1542 def items(self) -> list[TreeEntry]: 

1543 """Return the sorted entries in this tree. 

1544 

1545 Returns: List with (name, mode, sha) tuples 

1546 """ 

1547 return list(self.iteritems()) 

1548 

1549 def _deserialize(self, chunks: list[bytes]) -> None: 

1550 """Grab the entries in the tree.""" 

1551 try: 

1552 parsed_entries = parse_tree( 

1553 b"".join(chunks), 

1554 sha_len=self.object_format.oid_length, 

1555 ) 

1556 except ValueError as exc: 

1557 raise ObjectFormatException(exc) from exc 

1558 # TODO: list comprehension is for efficiency in the common (small) 

1559 # case; if memory efficiency in the large case is a concern, use a 

1560 # genexp. 

1561 self._entries = {n: (m, ObjectID(s)) for n, m, s in parsed_entries} 

1562 

1563 def check(self) -> None: 

1564 """Check this object for internal consistency. 

1565 

1566 Raises: 

1567 ObjectFormatException: if the object is malformed in some way 

1568 """ 

1569 super().check() 

1570 assert self._chunked_text is not None 

1571 last = None 

1572 allowed_modes = ( 

1573 stat.S_IFREG | 0o755, 

1574 stat.S_IFREG | 0o644, 

1575 stat.S_IFLNK, 

1576 stat.S_IFDIR, 

1577 S_IFGITLINK, 

1578 # TODO: optionally exclude as in git fsck --strict 

1579 stat.S_IFREG | 0o664, 

1580 ) 

1581 for name, mode, sha in parse_tree( 

1582 b"".join(self._chunked_text), 

1583 strict=True, 

1584 sha_len=self.object_format.oid_length, 

1585 ): 

1586 check_hexsha(sha, f"invalid sha {sha!r}") 

1587 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1588 raise ObjectFormatException( 

1589 "invalid name {}".format(name.decode("utf-8", "replace")) 

1590 ) 

1591 

1592 if mode not in allowed_modes: 

1593 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1594 

1595 entry = (name, (mode, ObjectID(sha))) 

1596 if last: 

1597 if key_entry(last) > key_entry(entry): 

1598 raise ObjectFormatException("entries not sorted") 

1599 if name == last[0]: 

1600 raise ObjectFormatException(f"duplicate entry {name!r}") 

1601 last = entry 

1602 

1603 def _serialize(self) -> list[bytes]: 

1604 return list(serialize_tree(self.iteritems())) 

1605 

1606 def as_pretty_string(self) -> str: 

1607 """Return a human-readable string representation of this tree. 

1608 

1609 Returns: 

1610 Pretty-printed tree entries 

1611 """ 

1612 text: list[str] = [] 

1613 for entry in self.iteritems(): 

1614 if ( 

1615 entry.path is not None 

1616 and entry.mode is not None 

1617 and entry.sha is not None 

1618 ): 

1619 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha)) 

1620 return "".join(text) 

1621 

1622 def lookup_path( 

1623 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1624 ) -> tuple[int, ObjectID]: 

1625 """Look up an object in a Git tree. 

1626 

1627 Args: 

1628 lookup_obj: Callback for retrieving object by SHA1 

1629 path: Path to lookup 

1630 Returns: A tuple of (mode, SHA) of the resulting path. 

1631 """ 

1632 # Handle empty path - return the tree itself 

1633 if not path: 

1634 return stat.S_IFDIR, self.id 

1635 

1636 parts = path.split(b"/") 

1637 sha = self.id 

1638 mode: int | None = None 

1639 for i, p in enumerate(parts): 

1640 if not p: 

1641 continue 

1642 if mode is not None and S_ISGITLINK(mode): 

1643 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1644 obj = lookup_obj(sha) 

1645 if not isinstance(obj, Tree): 

1646 raise NotTreeError(sha) 

1647 mode, sha = obj[p] 

1648 if mode is None: 

1649 raise ValueError("No valid path found") 

1650 return mode, sha 

1651 

1652 

1653def parse_timezone(text: bytes) -> tuple[int, bool]: 

1654 """Parse a timezone text fragment (e.g. '+0100'). 

1655 

1656 Args: 

1657 text: Text to parse. 

1658 Returns: Tuple with timezone as seconds difference to UTC 

1659 and a boolean indicating whether this was a UTC timezone 

1660 prefixed with a negative sign (-0000). 

1661 """ 

1662 # cgit parses the first character as the sign, and the rest 

1663 # as an integer (using strtol), which could also be negative. 

1664 # We do the same for compatibility. See #697828. 

1665 if text[0] not in b"+-": 

1666 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1667 sign = text[:1] 

1668 offset = int(text[1:]) 

1669 if sign == b"-": 

1670 offset = -offset 

1671 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1672 signum = ((offset < 0) and -1) or 1 

1673 offset = abs(offset) 

1674 hours = int(offset / 100) 

1675 minutes = offset % 100 

1676 return ( 

1677 signum * (hours * 3600 + minutes * 60), 

1678 unnecessary_negative_timezone, 

1679 ) 

1680 

1681 

1682def parse_timezone_broken(text: bytes) -> tuple[int, bool]: 

1683 """Parse a timezone text fragment, accepting broken formats. 

1684 

1685 This function handles various broken timezone formats found in the wild: 

1686 - Missing sign prefix (e.g., '0000' instead of '+0000') 

1687 - Double negative (e.g., '--700') 

1688 

1689 Args: 

1690 text: Text to parse. 

1691 Returns: Tuple with timezone as seconds difference to UTC 

1692 and a boolean indicating whether this was a UTC timezone 

1693 prefixed with a negative sign (-0000). 

1694 """ 

1695 if text[0] not in b"+-": 

1696 # Some (broken) commits do not have a sign 

1697 text = b"+" + text 

1698 

1699 # cgit parses the first character as the sign, and the rest 

1700 # as an integer (using strtol), which could also be negative. 

1701 # We do the same for compatibility. See #697828. 

1702 sign = text[:1] 

1703 offset = int(text[1:]) 

1704 if sign == b"-": 

1705 offset = -offset 

1706 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1707 signum = ((offset < 0) and -1) or 1 

1708 offset = abs(offset) 

1709 hours = int(offset / 100) 

1710 minutes = offset % 100 

1711 return ( 

1712 signum * (hours * 3600 + minutes * 60), 

1713 unnecessary_negative_timezone, 

1714 ) 

1715 

1716 

1717def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1718 """Format a timezone for Git serialization. 

1719 

1720 Args: 

1721 offset: Timezone offset as seconds difference to UTC 

1722 unnecessary_negative_timezone: Whether to use a minus sign for 

1723 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1724 """ 

1725 if offset % 60 != 0: 

1726 raise ValueError("Unable to handle non-minute offset.") 

1727 if offset < 0 or unnecessary_negative_timezone: 

1728 sign = "-" 

1729 offset = -offset 

1730 else: 

1731 sign = "+" 

1732 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1733 

1734 

1735def parse_time_entry( 

1736 value: bytes, 

1737) -> tuple[bytes, int | None, tuple[int | None, bool]]: 

1738 """Parse event. 

1739 

1740 Args: 

1741 value: Bytes representing a git commit/tag line 

1742 Raises: 

1743 ObjectFormatException in case of parsing error (malformed 

1744 field date) 

1745 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1746 """ 

1747 try: 

1748 sep = value.rindex(b"> ") 

1749 except ValueError: 

1750 return (value, None, (None, False)) 

1751 try: 

1752 person = value[0 : sep + 1] 

1753 rest = value[sep + 2 :] 

1754 timetext, timezonetext = rest.rsplit(b" ", 1) 

1755 time = int(timetext) 

1756 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1757 except ValueError as exc: 

1758 raise ObjectFormatException(exc) from exc 

1759 return person, time, (timezone, timezone_neg_utc) 

1760 

1761 

1762def parse_time_entry_broken( 

1763 value: bytes, 

1764) -> tuple[bytes, int | None, tuple[int | None, bool]]: 

1765 """Parse event, accepting broken formats. 

1766 

1767 This function handles various broken author/committer/tagger line formats: 

1768 - Missing angle brackets around email 

1769 - Unsigned timezones 

1770 - Double-negative timezones 

1771 

1772 Args: 

1773 value: Bytes representing a git commit/tag line 

1774 Raises: 

1775 ObjectFormatException in case of parsing error 

1776 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1777 """ 

1778 m = _TIME_ENTRY_RE.match(value) 

1779 if not m: 

1780 raise ObjectFormatException(f"Unable to parse time entry: {value!r}") 

1781 

1782 person = m.group("person") 

1783 timetext = m.group("time") 

1784 timezonetext = m.group("timezone") 

1785 time = int(timetext) 

1786 timezone, timezone_neg_utc = parse_timezone_broken(timezonetext) 

1787 

1788 return person, time, (timezone, timezone_neg_utc) 

1789 

1790 

1791def format_time_entry( 

1792 person: bytes, time: int, timezone_info: tuple[int, bool] 

1793) -> bytes: 

1794 """Format an event.""" 

1795 (timezone, timezone_neg_utc) = timezone_info 

1796 return b" ".join( 

1797 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1798 ) 

1799 

1800 

1801def _parse_commit( 

1802 chunks: Iterable[bytes], 

1803) -> tuple[ 

1804 bytes | None, 

1805 list[bytes], 

1806 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1807 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1808 bytes | None, 

1809 list[Tag], 

1810 bytes | None, 

1811 bytes | None, 

1812 list[tuple[bytes, bytes]], 

1813]: 

1814 """Parse a commit object from chunks. 

1815 

1816 Args: 

1817 chunks: Chunks to parse 

1818 Returns: Tuple of (tree, parents, author_info, commit_info, 

1819 encoding, mergetag, gpgsig, message, extra) 

1820 """ 

1821 parents = [] 

1822 extra: list[tuple[bytes, bytes]] = [] 

1823 tree = None 

1824 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1825 None, 

1826 None, 

1827 (None, None), 

1828 ) 

1829 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1830 None, 

1831 None, 

1832 (None, None), 

1833 ) 

1834 encoding = None 

1835 mergetag = [] 

1836 message = None 

1837 gpgsig = None 

1838 

1839 for field, value in _parse_message(chunks): 

1840 # TODO(jelmer): Enforce ordering 

1841 if field == _TREE_HEADER: 

1842 tree = value 

1843 elif field == _PARENT_HEADER: 

1844 if value is None: 

1845 raise ObjectFormatException("missing parent value") 

1846 parents.append(value) 

1847 elif field == _AUTHOR_HEADER: 

1848 if value is None: 

1849 raise ObjectFormatException("missing author value") 

1850 author_info = parse_time_entry(value) 

1851 elif field == _COMMITTER_HEADER: 

1852 if value is None: 

1853 raise ObjectFormatException("missing committer value") 

1854 commit_info = parse_time_entry(value) 

1855 elif field == _ENCODING_HEADER: 

1856 encoding = value 

1857 elif field == _MERGETAG_HEADER: 

1858 if value is None: 

1859 raise ObjectFormatException("missing mergetag value") 

1860 tag = Tag.from_string(value + b"\n") 

1861 assert isinstance(tag, Tag) 

1862 mergetag.append(tag) 

1863 elif field == _GPGSIG_HEADER: 

1864 gpgsig = value 

1865 elif field is None: 

1866 message = value 

1867 else: 

1868 if value is None: 

1869 raise ObjectFormatException(f"missing value for field {field!r}") 

1870 extra.append((field, value)) 

1871 return ( 

1872 tree, 

1873 parents, 

1874 author_info, 

1875 commit_info, 

1876 encoding, 

1877 mergetag, 

1878 gpgsig, 

1879 message, 

1880 extra, 

1881 ) 

1882 

1883 

1884def _parse_commit_broken( 

1885 chunks: Iterable[bytes], 

1886) -> tuple[ 

1887 bytes | None, 

1888 list[bytes], 

1889 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1890 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1891 bytes | None, 

1892 list[Tag], 

1893 bytes | None, 

1894 bytes | None, 

1895 list[tuple[bytes, bytes]], 

1896]: 

1897 """Parse a commit object from chunks, accepting broken formats. 

1898 

1899 This function handles various broken author/committer line formats: 

1900 - Missing angle brackets around email 

1901 - Unsigned timezones 

1902 - Double-negative timezones 

1903 

1904 Args: 

1905 chunks: Chunks to parse 

1906 Returns: Tuple of (tree, parents, author_info, commit_info, 

1907 encoding, mergetag, gpgsig, message, extra) 

1908 """ 

1909 parents = [] 

1910 extra: list[tuple[bytes, bytes]] = [] 

1911 tree = None 

1912 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1913 None, 

1914 None, 

1915 (None, None), 

1916 ) 

1917 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1918 None, 

1919 None, 

1920 (None, None), 

1921 ) 

1922 encoding = None 

1923 mergetag = [] 

1924 message = None 

1925 gpgsig = None 

1926 

1927 for field, value in _parse_message(chunks): 

1928 # TODO(jelmer): Enforce ordering 

1929 if field == _TREE_HEADER: 

1930 tree = value 

1931 elif field == _PARENT_HEADER: 

1932 if value is None: 

1933 raise ObjectFormatException("missing parent value") 

1934 parents.append(value) 

1935 elif field == _AUTHOR_HEADER: 

1936 if value is None: 

1937 raise ObjectFormatException("missing author value") 

1938 author_info = parse_time_entry_broken(value) 

1939 elif field == _COMMITTER_HEADER: 

1940 if value is None: 

1941 raise ObjectFormatException("missing committer value") 

1942 commit_info = parse_time_entry_broken(value) 

1943 elif field == _ENCODING_HEADER: 

1944 encoding = value 

1945 elif field == _MERGETAG_HEADER: 

1946 if value is None: 

1947 raise ObjectFormatException("missing mergetag value") 

1948 tag = Tag.from_string(value + b"\n") 

1949 assert isinstance(tag, Tag) 

1950 mergetag.append(tag) 

1951 elif field == _GPGSIG_HEADER: 

1952 gpgsig = value 

1953 elif field is None: 

1954 message = value 

1955 else: 

1956 if value is None: 

1957 raise ObjectFormatException(f"missing value for field {field!r}") 

1958 extra.append((field, value)) 

1959 return ( 

1960 tree, 

1961 parents, 

1962 author_info, 

1963 commit_info, 

1964 encoding, 

1965 mergetag, 

1966 gpgsig, 

1967 message, 

1968 extra, 

1969 ) 

1970 

1971 

1972class Commit(ShaFile): 

1973 """A git commit object.""" 

1974 

1975 type_name = b"commit" 

1976 type_num = 1 

1977 

1978 __slots__ = ( 

1979 "_author", 

1980 "_author_time", 

1981 "_author_timezone", 

1982 "_author_timezone_neg_utc", 

1983 "_commit_time", 

1984 "_commit_timezone", 

1985 "_commit_timezone_neg_utc", 

1986 "_committer", 

1987 "_encoding", 

1988 "_extra", 

1989 "_gpgsig", 

1990 "_mergetag", 

1991 "_message", 

1992 "_parents", 

1993 "_tree", 

1994 ) 

1995 

1996 def __init__(self) -> None: 

1997 """Initialize an empty Commit.""" 

1998 super().__init__() 

1999 self._parents: list[ObjectID] = [] 

2000 self._encoding: bytes | None = None 

2001 self._mergetag: list[Tag] = [] 

2002 self._gpgsig: bytes | None = None 

2003 self._extra: list[tuple[bytes, bytes]] = [] 

2004 self._author_timezone_neg_utc: bool | None = False 

2005 self._commit_timezone_neg_utc: bool | None = False 

2006 

2007 @classmethod 

2008 def from_path( 

2009 cls, 

2010 path: str | bytes, 

2011 sha: ObjectID | None = None, 

2012 *, 

2013 object_format: ObjectFormat | None = None, 

2014 ) -> "Commit": 

2015 """Read a commit from a file on disk. 

2016 

2017 Args: 

2018 path: Path to the commit file 

2019 sha: Optional known SHA for the object 

2020 object_format: Optional object format to use 

2021 

2022 Returns: 

2023 A Commit object 

2024 

2025 Raises: 

2026 NotCommitError: If the file is not a commit 

2027 """ 

2028 commit = ShaFile.from_path(path, sha, object_format=object_format) 

2029 if not isinstance(commit, cls): 

2030 raise NotCommitError(_path_to_bytes(path)) 

2031 return commit 

2032 

2033 def _deserialize(self, chunks: list[bytes]) -> None: 

2034 ( 

2035 tree, 

2036 parents, 

2037 author_info, 

2038 commit_info, 

2039 encoding, 

2040 mergetag, 

2041 gpgsig, 

2042 message, 

2043 extra, 

2044 ) = _parse_commit(chunks) 

2045 

2046 self._tree = tree 

2047 self._parents = [ObjectID(p) for p in parents] 

2048 self._encoding = encoding 

2049 self._mergetag = mergetag 

2050 self._gpgsig = gpgsig 

2051 self._message = message 

2052 self._extra = extra 

2053 

2054 ( 

2055 self._author, 

2056 self._author_time, 

2057 (self._author_timezone, self._author_timezone_neg_utc), 

2058 ) = author_info 

2059 ( 

2060 self._committer, 

2061 self._commit_time, 

2062 (self._commit_timezone, self._commit_timezone_neg_utc), 

2063 ) = commit_info 

2064 

2065 def check(self) -> None: 

2066 """Check this object for internal consistency. 

2067 

2068 Raises: 

2069 ObjectFormatException: if the object is malformed in some way 

2070 """ 

2071 super().check() 

2072 assert self._chunked_text is not None 

2073 self._check_has_member("_tree", "missing tree") 

2074 self._check_has_member("_author", "missing author") 

2075 self._check_has_member("_committer", "missing committer") 

2076 self._check_has_member("_author_time", "missing author time") 

2077 self._check_has_member("_commit_time", "missing commit time") 

2078 

2079 for parent in self._parents: 

2080 check_hexsha(parent, "invalid parent sha") 

2081 assert self._tree is not None # checked by _check_has_member above 

2082 check_hexsha(self._tree, "invalid tree sha") 

2083 

2084 assert self._author is not None # checked by _check_has_member above 

2085 assert self._committer is not None # checked by _check_has_member above 

2086 check_identity(self._author, "invalid author") 

2087 check_identity(self._committer, "invalid committer") 

2088 

2089 assert self._author_time is not None # checked by _check_has_member above 

2090 assert self._commit_time is not None # checked by _check_has_member above 

2091 check_time(self._author_time) 

2092 check_time(self._commit_time) 

2093 

2094 last = None 

2095 for field, _ in _parse_message(self._chunked_text): 

2096 if field == _TREE_HEADER and last is not None: 

2097 raise ObjectFormatException("unexpected tree") 

2098 elif field == _PARENT_HEADER and last not in ( 

2099 _PARENT_HEADER, 

2100 _TREE_HEADER, 

2101 ): 

2102 raise ObjectFormatException("unexpected parent") 

2103 elif field == _AUTHOR_HEADER and last not in ( 

2104 _TREE_HEADER, 

2105 _PARENT_HEADER, 

2106 ): 

2107 raise ObjectFormatException("unexpected author") 

2108 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

2109 raise ObjectFormatException("unexpected committer") 

2110 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

2111 raise ObjectFormatException("unexpected encoding") 

2112 last = field 

2113 

2114 # TODO: optionally check for duplicate parents 

2115 

2116 def raw_without_sig(self) -> bytes: 

2117 """Return raw string serialization without the GPG/SSH signature. 

2118 

2119 self.gpgsig is a signature for the returned raw byte string serialization. 

2120 """ 

2121 tmp = self.copy() 

2122 assert isinstance(tmp, Commit) 

2123 tmp._gpgsig = None 

2124 tmp.gpgsig = None 

2125 return tmp.as_raw_string() 

2126 

2127 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

2128 """Extract the payload, signature, and signature type from this commit. 

2129 

2130 Returns: 

2131 tuple of (``payload``, ``signature``, ``signature_type``) where: 

2132 

2133 - ``payload``: The raw commit data without the signature 

2134 - ``signature``: The signature bytes if present, None otherwise 

2135 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

2136 

2137 Raises: 

2138 ObjectFormatException: If signature has unknown format 

2139 """ 

2140 if self._gpgsig is None: 

2141 return self.as_raw_string(), None, None 

2142 

2143 payload = self.raw_without_sig() 

2144 

2145 # Determine signature type 

2146 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE): 

2147 sig_type = SIGNATURE_PGP 

2148 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE): 

2149 sig_type = SIGNATURE_SSH 

2150 else: 

2151 raise ObjectFormatException("Unknown signature format") 

2152 

2153 return payload, self._gpgsig, sig_type 

2154 

2155 def _serialize(self) -> list[bytes]: 

2156 headers = [] 

2157 assert self._tree is not None 

2158 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

2159 headers.append((_TREE_HEADER, tree_bytes)) 

2160 for p in self._parents: 

2161 headers.append((_PARENT_HEADER, p)) 

2162 assert self._author is not None 

2163 assert self._author_time is not None 

2164 assert self._author_timezone is not None 

2165 assert self._author_timezone_neg_utc is not None 

2166 headers.append( 

2167 ( 

2168 _AUTHOR_HEADER, 

2169 format_time_entry( 

2170 self._author, 

2171 self._author_time, 

2172 (self._author_timezone, self._author_timezone_neg_utc), 

2173 ), 

2174 ) 

2175 ) 

2176 assert self._committer is not None 

2177 assert self._commit_time is not None 

2178 assert self._commit_timezone is not None 

2179 assert self._commit_timezone_neg_utc is not None 

2180 headers.append( 

2181 ( 

2182 _COMMITTER_HEADER, 

2183 format_time_entry( 

2184 self._committer, 

2185 self._commit_time, 

2186 (self._commit_timezone, self._commit_timezone_neg_utc), 

2187 ), 

2188 ) 

2189 ) 

2190 if self.encoding: 

2191 headers.append((_ENCODING_HEADER, self.encoding)) 

2192 for mergetag in self.mergetag: 

2193 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

2194 headers.extend( 

2195 (field, value) for field, value in self._extra if value is not None 

2196 ) 

2197 if self.gpgsig: 

2198 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

2199 return list(_format_message(headers, self._message)) 

2200 

2201 tree = serializable_property("tree", "Tree that is the state of this commit") 

2202 

2203 def _get_parents(self) -> list[ObjectID]: 

2204 """Return a list of parents of this commit.""" 

2205 return self._parents 

2206 

2207 def _set_parents(self, value: list[ObjectID]) -> None: 

2208 """Set a list of parents of this commit.""" 

2209 self._needs_serialization = True 

2210 self._parents = value 

2211 

2212 parents = property( 

2213 _get_parents, 

2214 _set_parents, 

2215 doc="Parents of this commit, by their SHA1.", 

2216 ) 

2217 

2218 author = serializable_property("author", "The name of the author of the commit") 

2219 

2220 committer = serializable_property( 

2221 "committer", "The name of the committer of the commit" 

2222 ) 

2223 

2224 message = serializable_property("message", "The commit message") 

2225 

2226 commit_time = serializable_property( 

2227 "commit_time", 

2228 "The timestamp of the commit. As the number of seconds since the epoch.", 

2229 ) 

2230 

2231 commit_timezone = serializable_property( 

2232 "commit_timezone", "The zone the commit time is in" 

2233 ) 

2234 

2235 author_time = serializable_property( 

2236 "author_time", 

2237 "The timestamp the commit was written. As the number of " 

2238 "seconds since the epoch.", 

2239 ) 

2240 

2241 author_timezone = serializable_property( 

2242 "author_timezone", "Returns the zone the author time is in." 

2243 ) 

2244 

2245 encoding = serializable_property("encoding", "Encoding of the commit message.") 

2246 

2247 mergetag = serializable_property("mergetag", "Associated signed tag.") 

2248 

2249 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

2250 

2251 

2252OBJECT_CLASSES = ( 

2253 Commit, 

2254 Tree, 

2255 Blob, 

2256 Tag, 

2257) 

2258 

2259_TYPE_MAP: dict[bytes | int, type[ShaFile]] = {} 

2260 

2261for cls in OBJECT_CLASSES: 

2262 _TYPE_MAP[cls.type_name] = cls 

2263 _TYPE_MAP[cls.type_num] = cls 

2264 

2265 

2266# Public API functions 

2267 

2268 

2269def parse_commit_broken(data: bytes) -> Commit: 

2270 """Parse a commit with broken author/committer lines. 

2271 

2272 This function handles various broken formats found in the wild: 

2273 - Missing angle brackets around email addresses 

2274 - Unsigned timezones (e.g., "0000" instead of "+0000") 

2275 - Double-negative timezones (e.g., "--700") 

2276 - Negative timestamps 

2277 - Long/short/nonsensical timezone values 

2278 

2279 Warning: Commits parsed with this function may not round-trip correctly 

2280 through serialization, as the broken formatting is normalized during parsing. 

2281 The .check() method will likely fail for commits with malformed identity fields. 

2282 

2283 Args: 

2284 data: Raw commit data as bytes 

2285 

2286 Returns: 

2287 A Commit object with normalized fields 

2288 

2289 Example: 

2290 >>> data = b'''tree d80c186a03f423a81b39df39dc87fd269736ca86 

2291 ... author user@example.com 1234567890 -0500 

2292 ... committer user@example.com 1234567890 -0500 

2293 ... 

2294 ... Commit message 

2295 ... ''' 

2296 >>> commit = parse_commit_broken(data) 

2297 >>> commit.author 

2298 b'user@example.com' 

2299 """ 

2300 commit = Commit() 

2301 ( 

2302 tree, 

2303 parents, 

2304 author_info, 

2305 commit_info, 

2306 encoding, 

2307 mergetag, 

2308 gpgsig, 

2309 message, 

2310 extra, 

2311 ) = _parse_commit_broken([data]) 

2312 

2313 commit._tree = tree 

2314 commit._parents = [ObjectID(p) for p in parents] 

2315 commit._encoding = encoding 

2316 commit._mergetag = mergetag 

2317 commit._gpgsig = gpgsig 

2318 commit._message = message 

2319 commit._extra = extra 

2320 

2321 ( 

2322 commit._author, 

2323 commit._author_time, 

2324 (commit._author_timezone, commit._author_timezone_neg_utc), 

2325 ) = author_info 

2326 ( 

2327 commit._committer, 

2328 commit._commit_time, 

2329 (commit._commit_timezone, commit._commit_timezone_neg_utc), 

2330 ) = commit_info 

2331 

2332 return commit 

2333 

2334 

2335# Hold on to the pure-python implementations for testing 

2336_parse_tree_py = parse_tree 

2337_sorted_tree_items_py = sorted_tree_items 

2338try: 

2339 # Try to import Rust versions 

2340 from dulwich._objects import ( 

2341 parse_tree as _parse_tree_rs, 

2342 ) 

2343 from dulwich._objects import ( 

2344 sorted_tree_items as _sorted_tree_items_rs, 

2345 ) 

2346except ImportError: 

2347 pass 

2348else: 

2349 parse_tree = _parse_tree_rs 

2350 sorted_tree_items = _sorted_tree_items_rs