Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1057 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25__all__ = [ 

26 "BEGIN_PGP_SIGNATURE", 

27 "BEGIN_SSH_SIGNATURE", 

28 "MAX_TIME", 

29 "OBJECT_CLASSES", 

30 "SIGNATURE_PGP", 

31 "SIGNATURE_SSH", 

32 "S_IFGITLINK", 

33 "S_ISGITLINK", 

34 "ZERO_SHA", 

35 "Blob", 

36 "Commit", 

37 "EmptyFileException", 

38 "FixedSha", 

39 "ObjectID", 

40 "RawObjectID", 

41 "ShaFile", 

42 "SubmoduleEncountered", 

43 "Tag", 

44 "Tree", 

45 "TreeEntry", 

46 "check_hexsha", 

47 "check_identity", 

48 "check_time", 

49 "filename_to_hex", 

50 "format_time_entry", 

51 "format_timezone", 

52 "git_line", 

53 "hex_to_filename", 

54 "hex_to_sha", 

55 "is_blob", 

56 "is_commit", 

57 "is_tag", 

58 "is_tree", 

59 "key_entry", 

60 "key_entry_name_order", 

61 "object_class", 

62 "object_header", 

63 "parse_commit_broken", 

64 "parse_tree", 

65 "pretty_format_tree_entry", 

66 "serializable_property", 

67 "serialize_tree", 

68 "sha_to_hex", 

69 "sorted_tree_items", 

70 "valid_hexsha", 

71] 

72 

73import binascii 

74import os 

75import posixpath 

76import re 

77import stat 

78import sys 

79import zlib 

80from collections.abc import Callable, Iterable, Iterator, Sequence 

81from hashlib import sha1 

82from io import BufferedIOBase, BytesIO 

83from typing import ( 

84 IO, 

85 TYPE_CHECKING, 

86 NamedTuple, 

87 TypeVar, 

88) 

89 

90if sys.version_info >= (3, 11): 

91 from typing import Self 

92else: 

93 from typing_extensions import Self 

94 

95from typing import NewType, TypeGuard 

96 

97from .errors import ( 

98 ChecksumMismatch, 

99 FileFormatException, 

100 NotBlobError, 

101 NotCommitError, 

102 NotTagError, 

103 NotTreeError, 

104 ObjectFormatException, 

105) 

106from .file import GitFile 

107from .object_format import DEFAULT_OBJECT_FORMAT, ObjectFormat 

108 

109if TYPE_CHECKING: 

110 from _hashlib import HASH 

111 

112 from .file import _GitFile 

113 

114# Zero SHA constants for backward compatibility - now defined below as ObjectID 

115 

116 

117# Header fields for commits 

118_TREE_HEADER = b"tree" 

119_PARENT_HEADER = b"parent" 

120_AUTHOR_HEADER = b"author" 

121_COMMITTER_HEADER = b"committer" 

122_ENCODING_HEADER = b"encoding" 

123_MERGETAG_HEADER = b"mergetag" 

124_GPGSIG_HEADER = b"gpgsig" 

125 

126# Header fields for objects 

127_OBJECT_HEADER = b"object" 

128_TYPE_HEADER = b"type" 

129_TAG_HEADER = b"tag" 

130_TAGGER_HEADER = b"tagger" 

131 

132 

133S_IFGITLINK = 0o160000 

134 

135# Intentionally flexible regex to support various types of brokenness 

136# in commit/tag author/committer/tagger lines 

137_TIME_ENTRY_RE = re.compile( 

138 b"^(?P<person>.*) (?P<time>-?[0-9]+) (?P<timezone>[+-]{0,2}[0-9]+)$" 

139) 

140 

141 

142MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

143 

144BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

145BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----" 

146 

147# Signature type constants 

148SIGNATURE_PGP = b"pgp" 

149SIGNATURE_SSH = b"ssh" 

150 

151 

152# Hex SHA type 

153ObjectID = NewType("ObjectID", bytes) 

154 

155# Raw SHA type 

156RawObjectID = NewType("RawObjectID", bytes) 

157 

158# Zero SHA constant 

159ZERO_SHA: ObjectID = ObjectID(b"0" * 40) 

160 

161 

162class EmptyFileException(FileFormatException): 

163 """An unexpectedly empty file was encountered.""" 

164 

165 

166def S_ISGITLINK(m: int) -> bool: 

167 """Check if a mode indicates a submodule. 

168 

169 Args: 

170 m: Mode to check 

171 Returns: a ``boolean`` 

172 """ 

173 return stat.S_IFMT(m) == S_IFGITLINK 

174 

175 

176def _decompress(string: bytes) -> bytes: 

177 dcomp = zlib.decompressobj() 

178 dcomped = dcomp.decompress(string) 

179 dcomped += dcomp.flush() 

180 return dcomped 

181 

182 

183def sha_to_hex(sha: RawObjectID) -> ObjectID: 

184 """Takes a string and returns the hex of the sha within.""" 

185 hexsha = binascii.hexlify(sha) 

186 # Support both SHA1 (40 chars) and SHA256 (64 chars) 

187 if len(hexsha) not in (40, 64): 

188 raise ValueError(f"Incorrect length of sha string: {hexsha!r}") 

189 return ObjectID(hexsha) 

190 

191 

192def hex_to_sha(hex: ObjectID | str) -> RawObjectID: 

193 """Takes a hex sha and returns a binary sha.""" 

194 # Support both SHA1 (40 chars) and SHA256 (64 chars) 

195 if len(hex) not in (40, 64): 

196 raise ValueError(f"Incorrect length of hexsha: {hex!r}") 

197 try: 

198 return RawObjectID(binascii.unhexlify(hex)) 

199 except TypeError as exc: 

200 if not isinstance(hex, bytes): 

201 raise 

202 raise ValueError(exc.args[0]) from exc 

203 

204 

205def valid_hexsha(hex: bytes | str) -> bool: 

206 """Check if a hex string is a valid SHA1 or SHA256. 

207 

208 Args: 

209 hex: Hex string to validate 

210 

211 Returns: 

212 True if valid SHA1 (40 chars) or SHA256 (64 chars), False otherwise 

213 """ 

214 if len(hex) not in (40, 64): 

215 return False 

216 try: 

217 binascii.unhexlify(hex) 

218 except (TypeError, binascii.Error): 

219 return False 

220 else: 

221 return True 

222 

223 

224PathT = TypeVar("PathT", str, bytes) 

225 

226 

227def hex_to_filename(path: PathT, hex: str | bytes) -> PathT: 

228 """Takes a hex sha and returns its filename relative to the given path.""" 

229 # os.path.join accepts bytes or unicode, but all args must be of the same 

230 # type. Make sure that hex which is expected to be bytes, is the same type 

231 # as path. 

232 if isinstance(path, str): 

233 if isinstance(hex, bytes): 

234 hex_str = hex.decode("ascii") 

235 else: 

236 hex_str = hex 

237 dir_name = hex_str[:2] 

238 file_name = hex_str[2:] 

239 result = os.path.join(path, dir_name, file_name) 

240 assert isinstance(result, str) 

241 return result 

242 else: 

243 # path is bytes 

244 if isinstance(hex, str): 

245 hex_bytes = hex.encode("ascii") 

246 else: 

247 hex_bytes = hex 

248 dir_name_b = hex_bytes[:2] 

249 file_name_b = hex_bytes[2:] 

250 result_b = os.path.join(path, dir_name_b, file_name_b) 

251 assert isinstance(result_b, bytes) 

252 return result_b 

253 

254 

255def filename_to_hex(filename: str | bytes) -> str: 

256 """Takes an object filename and returns its corresponding hex sha.""" 

257 # grab the last (up to) two path components 

258 errmsg = f"Invalid object filename: {filename!r}" 

259 if isinstance(filename, str): 

260 names = filename.rsplit(os.path.sep, 2)[-2:] 

261 assert len(names) == 2, errmsg 

262 base, rest = names 

263 assert len(base) == 2 and len(rest) == 38, errmsg 

264 hex_str = base + rest 

265 hex_bytes = hex_str.encode("ascii") 

266 else: 

267 # filename is bytes 

268 sep = ( 

269 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep 

270 ) 

271 names_b = filename.rsplit(sep, 2)[-2:] 

272 assert len(names_b) == 2, errmsg 

273 base_b, rest_b = names_b 

274 assert len(base_b) == 2 and len(rest_b) == 38, errmsg 

275 hex_bytes = base_b + rest_b 

276 hex_to_sha(ObjectID(hex_bytes)) 

277 return hex_bytes.decode("ascii") 

278 

279 

280def object_header(num_type: int, length: int) -> bytes: 

281 """Return an object header for the given numeric type and text length.""" 

282 cls = object_class(num_type) 

283 if cls is None: 

284 raise AssertionError(f"unsupported class type num: {num_type}") 

285 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

286 

287 

288def serializable_property(name: str, docstring: str | None = None) -> property: 

289 """A property that helps tracking whether serialization is necessary.""" 

290 

291 def set(obj: "ShaFile", value: object) -> None: 

292 """Set the property value and mark the object as needing serialization. 

293 

294 Args: 

295 obj: The ShaFile object 

296 value: The value to set 

297 """ 

298 setattr(obj, "_" + name, value) 

299 obj._needs_serialization = True 

300 

301 def get(obj: "ShaFile") -> object: 

302 """Get the property value. 

303 

304 Args: 

305 obj: The ShaFile object 

306 

307 Returns: 

308 The property value 

309 """ 

310 return getattr(obj, "_" + name) 

311 

312 return property(get, set, doc=docstring) 

313 

314 

315def object_class(type: bytes | int) -> type["ShaFile"] | None: 

316 """Get the object class corresponding to the given type. 

317 

318 Args: 

319 type: Either a type name string or a numeric type. 

320 Returns: The ShaFile subclass corresponding to the given type, or None if 

321 type is not a valid type name/number. 

322 """ 

323 return _TYPE_MAP.get(type, None) 

324 

325 

326def check_hexsha(hex: str | bytes, error_msg: str) -> None: 

327 """Check if a string is a valid hex sha string. 

328 

329 Args: 

330 hex: Hex string to check 

331 error_msg: Error message to use in exception 

332 Raises: 

333 ObjectFormatException: Raised when the string is not valid 

334 """ 

335 if not valid_hexsha(hex): 

336 raise ObjectFormatException(f"{error_msg} {hex!r}") 

337 

338 

339def check_identity(identity: bytes | None, error_msg: str) -> None: 

340 """Check if the specified identity is valid. 

341 

342 This will raise an exception if the identity is not valid. 

343 

344 Args: 

345 identity: Identity string 

346 error_msg: Error message to use in exception 

347 """ 

348 if identity is None: 

349 raise ObjectFormatException(error_msg) 

350 email_start = identity.find(b"<") 

351 email_end = identity.find(b">") 

352 if not all( 

353 [ 

354 email_start >= 1, 

355 identity[email_start - 1] == b" "[0], 

356 identity.find(b"<", email_start + 1) == -1, 

357 email_end == len(identity) - 1, 

358 b"\0" not in identity, 

359 b"\n" not in identity, 

360 ] 

361 ): 

362 raise ObjectFormatException(error_msg) 

363 

364 

365def _path_to_bytes(path: str | bytes) -> bytes: 

366 """Convert a path to bytes for use in error messages.""" 

367 if isinstance(path, str): 

368 return path.encode("utf-8", "surrogateescape") 

369 return path 

370 

371 

372def check_time(time_seconds: int) -> None: 

373 """Check if the specified time is not prone to overflow error. 

374 

375 This will raise an exception if the time is not valid. 

376 

377 Args: 

378 time_seconds: time in seconds 

379 

380 """ 

381 # Prevent overflow error 

382 if time_seconds > MAX_TIME: 

383 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

384 

385 

386def git_line(*items: bytes) -> bytes: 

387 """Formats items into a space separated line.""" 

388 return b" ".join(items) + b"\n" 

389 

390 

391class FixedSha: 

392 """SHA object that behaves like hashlib's but is given a fixed value.""" 

393 

394 __slots__ = ("_hexsha", "_sha") 

395 

396 def __init__(self, hexsha: str | bytes) -> None: 

397 """Initialize FixedSha with a fixed SHA value. 

398 

399 Args: 

400 hexsha: Hex SHA value as string or bytes 

401 """ 

402 if isinstance(hexsha, str): 

403 hexsha = hexsha.encode("ascii") 

404 if not isinstance(hexsha, bytes): 

405 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

406 self._hexsha = hexsha 

407 self._sha = hex_to_sha(ObjectID(hexsha)) 

408 

409 def digest(self) -> bytes: 

410 """Return the raw SHA digest.""" 

411 return self._sha 

412 

413 def hexdigest(self) -> str: 

414 """Return the hex SHA digest.""" 

415 return self._hexsha.decode("ascii") 

416 

417 

418# Type guard functions for runtime type narrowing 

419if TYPE_CHECKING: 

420 

421 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

422 """Check if a ShaFile is a Commit.""" 

423 return obj.type_name == b"commit" 

424 

425 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

426 """Check if a ShaFile is a Tree.""" 

427 return obj.type_name == b"tree" 

428 

429 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

430 """Check if a ShaFile is a Blob.""" 

431 return obj.type_name == b"blob" 

432 

433 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

434 """Check if a ShaFile is a Tag.""" 

435 return obj.type_name == b"tag" 

436else: 

437 # Runtime versions without type narrowing 

438 def is_commit(obj: "ShaFile") -> bool: 

439 """Check if a ShaFile is a Commit.""" 

440 return obj.type_name == b"commit" 

441 

442 def is_tree(obj: "ShaFile") -> bool: 

443 """Check if a ShaFile is a Tree.""" 

444 return obj.type_name == b"tree" 

445 

446 def is_blob(obj: "ShaFile") -> bool: 

447 """Check if a ShaFile is a Blob.""" 

448 return obj.type_name == b"blob" 

449 

450 def is_tag(obj: "ShaFile") -> bool: 

451 """Check if a ShaFile is a Tag.""" 

452 return obj.type_name == b"tag" 

453 

454 

455class ShaFile: 

456 """A git SHA file.""" 

457 

458 __slots__ = ("_chunked_text", "_needs_serialization", "_sha", "object_format") 

459 

460 _needs_serialization: bool 

461 type_name: bytes 

462 type_num: int 

463 _chunked_text: list[bytes] | None 

464 _sha: "FixedSha | None | HASH" 

465 object_format: ObjectFormat 

466 

467 def __init__(self) -> None: 

468 """Initialize a ShaFile.""" 

469 self._sha = None 

470 self._chunked_text = None 

471 self._needs_serialization = True 

472 self.object_format = DEFAULT_OBJECT_FORMAT 

473 

474 @staticmethod 

475 def _parse_legacy_object_header( 

476 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile" 

477 ) -> "ShaFile": 

478 """Parse a legacy object, creating it but not reading the file.""" 

479 bufsize = 1024 

480 decomp = zlib.decompressobj() 

481 header = decomp.decompress(magic) 

482 start = 0 

483 end = -1 

484 while end < 0: 

485 extra = f.read(bufsize) 

486 header += decomp.decompress(extra) 

487 magic += extra 

488 end = header.find(b"\0", start) 

489 start = len(header) 

490 header = header[:end] 

491 type_name, size = header.split(b" ", 1) 

492 try: 

493 int(size) # sanity check 

494 except ValueError as exc: 

495 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

496 obj_class = object_class(type_name) 

497 if not obj_class: 

498 raise ObjectFormatException( 

499 "Not a known type: {}".format(type_name.decode("ascii")) 

500 ) 

501 return obj_class() 

502 

503 def _parse_legacy_object(self, map: bytes) -> None: 

504 """Parse a legacy object, setting the raw string.""" 

505 text = _decompress(map) 

506 header_end = text.find(b"\0") 

507 if header_end < 0: 

508 raise ObjectFormatException("Invalid object header, no \\0") 

509 self.set_raw_string(text[header_end + 1 :]) 

510 

511 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

512 """Return chunks representing the object in the experimental format. 

513 

514 Returns: List of strings 

515 """ 

516 compobj = zlib.compressobj(compression_level) 

517 yield compobj.compress(self._header()) 

518 for chunk in self.as_raw_chunks(): 

519 yield compobj.compress(chunk) 

520 yield compobj.flush() 

521 

522 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

523 """Return string representing the object in the experimental format.""" 

524 return b"".join( 

525 self.as_legacy_object_chunks(compression_level=compression_level) 

526 ) 

527 

528 def as_raw_chunks(self) -> list[bytes]: 

529 """Return chunks with serialization of the object. 

530 

531 Returns: List of strings, not necessarily one per line 

532 """ 

533 if self._needs_serialization: 

534 self._sha = None 

535 self._chunked_text = self._serialize() 

536 self._needs_serialization = False 

537 assert self._chunked_text is not None 

538 return self._chunked_text 

539 

540 def as_raw_string(self) -> bytes: 

541 """Return raw string with serialization of the object. 

542 

543 Returns: String object 

544 """ 

545 return b"".join(self.as_raw_chunks()) 

546 

547 def __bytes__(self) -> bytes: 

548 """Return raw string serialization of this object.""" 

549 return self.as_raw_string() 

550 

551 def __hash__(self) -> int: 

552 """Return unique hash for this object.""" 

553 return hash(self.id) 

554 

555 def as_pretty_string(self) -> str: 

556 """Return a string representing this object, fit for display.""" 

557 return self.as_raw_string().decode("utf-8", "replace") 

558 

559 def set_raw_string( 

560 self, text: bytes, sha: ObjectID | RawObjectID | None = None 

561 ) -> None: 

562 """Set the contents of this object from a serialized string.""" 

563 if not isinstance(text, bytes): 

564 raise TypeError(f"Expected bytes for text, got {text!r}") 

565 self.set_raw_chunks([text], sha) 

566 

567 def set_raw_chunks( 

568 self, 

569 chunks: list[bytes], 

570 sha: ObjectID | RawObjectID | None = None, 

571 *, 

572 object_format: ObjectFormat | None = None, 

573 ) -> None: 

574 """Set the contents of this object from a list of chunks.""" 

575 self._chunked_text = chunks 

576 # Set hash algorithm if provided 

577 if object_format is not None: 

578 self.object_format = object_format 

579 # Set SHA before deserialization so Tree can use hash algorithm 

580 if sha is None: 

581 self._sha = None 

582 else: 

583 self._sha = FixedSha(sha) 

584 self._deserialize(chunks) 

585 self._needs_serialization = False 

586 

587 @staticmethod 

588 def _parse_object_header( 

589 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile" 

590 ) -> "ShaFile": 

591 """Parse a new style object, creating it but not reading the file.""" 

592 num_type = (ord(magic[0:1]) >> 4) & 7 

593 obj_class = object_class(num_type) 

594 if not obj_class: 

595 raise ObjectFormatException(f"Not a known type {num_type}") 

596 return obj_class() 

597 

598 def _parse_object(self, map: bytes) -> None: 

599 """Parse a new style object, setting self._text.""" 

600 # skip type and size; type must have already been determined, and 

601 # we trust zlib to fail if it's otherwise corrupted 

602 byte = ord(map[0:1]) 

603 used = 1 

604 while (byte & 0x80) != 0: 

605 byte = ord(map[used : used + 1]) 

606 used += 1 

607 raw = map[used:] 

608 self.set_raw_string(_decompress(raw)) 

609 

610 @classmethod 

611 def _is_legacy_object(cls, magic: bytes) -> bool: 

612 b0 = ord(magic[0:1]) 

613 b1 = ord(magic[1:2]) 

614 word = (b0 << 8) + b1 

615 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

616 

617 @classmethod 

618 def _parse_file( 

619 cls, 

620 f: BufferedIOBase | IO[bytes] | "_GitFile", 

621 *, 

622 object_format: ObjectFormat | None = None, 

623 ) -> "ShaFile": 

624 map = f.read() 

625 if not map: 

626 raise EmptyFileException("Corrupted empty file detected") 

627 

628 if cls._is_legacy_object(map): 

629 obj = cls._parse_legacy_object_header(map, f) 

630 if object_format is not None: 

631 obj.object_format = object_format 

632 obj._parse_legacy_object(map) 

633 else: 

634 obj = cls._parse_object_header(map, f) 

635 if object_format is not None: 

636 obj.object_format = object_format 

637 obj._parse_object(map) 

638 return obj 

639 

640 def _deserialize(self, chunks: list[bytes]) -> None: 

641 raise NotImplementedError(self._deserialize) 

642 

643 def _serialize(self) -> list[bytes]: 

644 raise NotImplementedError(self._serialize) 

645 

646 @classmethod 

647 def from_path( 

648 cls, 

649 path: str | bytes, 

650 sha: ObjectID | None = None, 

651 *, 

652 object_format: ObjectFormat | None = None, 

653 ) -> "ShaFile": 

654 """Open a SHA file from disk.""" 

655 with GitFile(path, "rb") as f: 

656 return cls.from_file(f, sha, object_format=object_format) 

657 

658 @classmethod 

659 def from_file( 

660 cls, 

661 f: BufferedIOBase | IO[bytes] | "_GitFile", 

662 sha: ObjectID | None = None, 

663 *, 

664 object_format: ObjectFormat | None = None, 

665 ) -> "ShaFile": 

666 """Get the contents of a SHA file on disk.""" 

667 try: 

668 # Validate SHA length matches hash algorithm if both provided 

669 if sha is not None and object_format is not None: 

670 expected_len = object_format.hex_length 

671 if len(sha) != expected_len: 

672 raise ValueError( 

673 f"SHA length {len(sha)} doesn't match hash algorithm " 

674 f"{object_format.name} (expected {expected_len})" 

675 ) 

676 

677 obj = cls._parse_file(f, object_format=object_format) 

678 if sha is not None: 

679 obj._sha = FixedSha(sha) 

680 else: 

681 obj._sha = None 

682 return obj 

683 except (IndexError, ValueError) as exc: 

684 raise ObjectFormatException("invalid object header") from exc 

685 

686 @staticmethod 

687 def from_raw_string( 

688 type_num: int, 

689 string: bytes, 

690 sha: ObjectID | RawObjectID | None = None, 

691 *, 

692 object_format: ObjectFormat | None = None, 

693 ) -> "ShaFile": 

694 """Creates an object of the indicated type from the raw string given. 

695 

696 Args: 

697 type_num: The numeric type of the object. 

698 string: The raw uncompressed contents. 

699 sha: Optional known sha for the object 

700 object_format: Optional hash algorithm for the object 

701 """ 

702 cls = object_class(type_num) 

703 if cls is None: 

704 raise AssertionError(f"unsupported class type num: {type_num}") 

705 obj = cls() 

706 if object_format is not None: 

707 obj.object_format = object_format 

708 obj.set_raw_string(string, sha) 

709 return obj 

710 

711 @staticmethod 

712 def from_raw_chunks( 

713 type_num: int, 

714 chunks: list[bytes], 

715 sha: ObjectID | RawObjectID | None = None, 

716 *, 

717 object_format: ObjectFormat | None = None, 

718 ) -> "ShaFile": 

719 """Creates an object of the indicated type from the raw chunks given. 

720 

721 Args: 

722 type_num: The numeric type of the object. 

723 chunks: An iterable of the raw uncompressed contents. 

724 sha: Optional known sha for the object 

725 object_format: Optional object format (hash algorithm) for the object. 

726 Required for trees in SHA-256 repositories so entry parsing uses 

727 the correct OID length. 

728 """ 

729 cls = object_class(type_num) 

730 if cls is None: 

731 raise AssertionError(f"unsupported class type num: {type_num}") 

732 obj = cls() 

733 obj.set_raw_chunks(chunks, sha, object_format=object_format) 

734 return obj 

735 

736 @classmethod 

737 def from_string(cls, string: bytes) -> Self: 

738 """Create a ShaFile from a string.""" 

739 obj = cls() 

740 obj.set_raw_string(string) 

741 return obj 

742 

743 def _check_has_member(self, member: str, error_msg: str) -> None: 

744 """Check that the object has a given member variable. 

745 

746 Args: 

747 member: the member variable to check for 

748 error_msg: the message for an error if the member is missing 

749 Raises: 

750 ObjectFormatException: with the given error_msg if member is 

751 missing or is None 

752 """ 

753 if getattr(self, member, None) is None: 

754 raise ObjectFormatException(error_msg) 

755 

756 def check(self) -> None: 

757 """Check this object for internal consistency. 

758 

759 Raises: 

760 ObjectFormatException: if the object is malformed in some way 

761 ChecksumMismatch: if the object was created with a SHA that does 

762 not match its contents 

763 """ 

764 # TODO: if we find that error-checking during object parsing is a 

765 # performance bottleneck, those checks should be moved to the class's 

766 # check() method during optimization so we can still check the object 

767 # when necessary. 

768 old_sha = self.id 

769 try: 

770 self._deserialize(self.as_raw_chunks()) 

771 self._sha = None 

772 new_sha = self.id 

773 except Exception as exc: 

774 raise ObjectFormatException(exc) from exc 

775 if old_sha != new_sha: 

776 raise ChecksumMismatch(new_sha, old_sha) 

777 

778 def _header(self) -> bytes: 

779 return object_header(self.type_num, self.raw_length()) 

780 

781 def raw_length(self) -> int: 

782 """Returns the length of the raw string of this object.""" 

783 return sum(map(len, self.as_raw_chunks())) 

784 

785 def sha(self, object_format: ObjectFormat | None = None) -> "FixedSha | HASH": 

786 """The SHA object that is the name of this object. 

787 

788 Args: 

789 object_format: Optional HashAlgorithm to use. Defaults to SHA1. 

790 """ 

791 # If using a different hash algorithm, always recalculate 

792 if object_format is not None: 

793 new_sha = object_format.new_hash() 

794 new_sha.update(self._header()) 

795 for chunk in self.as_raw_chunks(): 

796 new_sha.update(chunk) 

797 return new_sha 

798 

799 # Otherwise use cached SHA1 value 

800 if self._sha is None or self._needs_serialization: 

801 # this is a local because as_raw_chunks() overwrites self._sha 

802 new_sha = sha1() 

803 new_sha.update(self._header()) 

804 for chunk in self.as_raw_chunks(): 

805 new_sha.update(chunk) 

806 self._sha = new_sha 

807 return self._sha 

808 

809 def copy(self) -> "ShaFile": 

810 """Create a new copy of this SHA1 object from its raw string.""" 

811 obj_class = object_class(self.type_num) 

812 if obj_class is None: 

813 raise AssertionError(f"invalid type num {self.type_num}") 

814 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

815 

816 @property 

817 def id(self) -> ObjectID: 

818 """The hex SHA1 of this object. 

819 

820 For SHA256 repositories, use get_id(object_format) instead. 

821 This property always returns SHA1 for backward compatibility. 

822 """ 

823 return ObjectID(self.sha().hexdigest().encode("ascii")) 

824 

825 def get_id(self, object_format: ObjectFormat | None = None) -> bytes: 

826 """Get the hex SHA of this object using the specified hash algorithm. 

827 

828 Args: 

829 object_format: Optional HashAlgorithm to use. Defaults to SHA1. 

830 

831 Example: 

832 >>> blob = Blob() 

833 >>> blob.data = b"Hello, World!" 

834 >>> blob.id # Always returns SHA1 for backward compatibility 

835 b'4ab299c8ad6ed14f31923dd94f8b5f5cb89dfb54' 

836 >>> blob.get_id() # Same as .id 

837 b'4ab299c8ad6ed14f31923dd94f8b5f5cb89dfb54' 

838 >>> from dulwich.object_format import SHA256 

839 >>> blob.get_id(SHA256) # Get SHA256 hash 

840 b'03ba204e2f2e707...' # 64-character SHA256 

841 """ 

842 return self.sha(object_format).hexdigest().encode("ascii") 

843 

844 def __repr__(self) -> str: 

845 """Return string representation of this object.""" 

846 return f"<{self.__class__.__name__} {self.id!r}>" 

847 

848 def __ne__(self, other: object) -> bool: 

849 """Check whether this object does not match the other.""" 

850 return not isinstance(other, ShaFile) or self.id != other.id 

851 

852 def __eq__(self, other: object) -> bool: 

853 """Return True if the SHAs of the two objects match.""" 

854 return isinstance(other, ShaFile) and self.id == other.id 

855 

856 def __lt__(self, other: object) -> bool: 

857 """Return whether SHA of this object is less than the other.""" 

858 if not isinstance(other, ShaFile): 

859 raise TypeError 

860 return self.id < other.id 

861 

862 def __le__(self, other: object) -> bool: 

863 """Check whether SHA of this object is less than or equal to the other.""" 

864 if not isinstance(other, ShaFile): 

865 raise TypeError 

866 return self.id <= other.id 

867 

868 

869class Blob(ShaFile): 

870 """A Git Blob object.""" 

871 

872 __slots__ = () 

873 

874 type_name = b"blob" 

875 type_num = 3 

876 

877 _chunked_text: list[bytes] 

878 

879 def __init__(self) -> None: 

880 """Initialize a new Blob object.""" 

881 super().__init__() 

882 self._chunked_text = [] 

883 self._needs_serialization = False 

884 

885 @property 

886 def data(self) -> bytes: 

887 """The text contained within the blob object.""" 

888 return self.as_raw_string() 

889 

890 @data.setter 

891 def data(self, data: bytes) -> None: 

892 self.set_raw_string(data) 

893 

894 @property 

895 def chunked(self) -> list[bytes]: 

896 """The text in the blob object, as chunks (not necessarily lines).""" 

897 return self._chunked_text 

898 

899 @chunked.setter 

900 def chunked(self, chunks: list[bytes]) -> None: 

901 self._chunked_text = chunks 

902 

903 def _serialize(self) -> list[bytes]: 

904 return self._chunked_text 

905 

906 def _deserialize(self, chunks: list[bytes]) -> None: 

907 self._chunked_text = chunks 

908 

909 @classmethod 

910 def from_path( 

911 cls, 

912 path: str | bytes, 

913 sha: ObjectID | None = None, 

914 *, 

915 object_format: ObjectFormat | None = None, 

916 ) -> "Blob": 

917 """Read a blob from a file on disk. 

918 

919 Args: 

920 path: Path to the blob file 

921 sha: Optional known SHA for the object 

922 object_format: Optional object format to use 

923 

924 Returns: 

925 A Blob object 

926 

927 Raises: 

928 NotBlobError: If the file is not a blob 

929 """ 

930 blob = ShaFile.from_path(path, sha, object_format=object_format) 

931 if not isinstance(blob, cls): 

932 raise NotBlobError(_path_to_bytes(path)) 

933 return blob 

934 

935 def check(self) -> None: 

936 """Check this object for internal consistency. 

937 

938 Raises: 

939 ObjectFormatException: if the object is malformed in some way 

940 """ 

941 super().check() 

942 

943 def splitlines(self) -> list[bytes]: 

944 """Return list of lines in this blob. 

945 

946 This preserves the original line endings. 

947 """ 

948 chunks = self.chunked 

949 if not chunks: 

950 return [] 

951 if len(chunks) == 1: 

952 result: list[bytes] = chunks[0].splitlines(True) 

953 return result 

954 remaining = None 

955 ret = [] 

956 for chunk in chunks: 

957 lines = chunk.splitlines(True) 

958 if len(lines) > 1: 

959 ret.append((remaining or b"") + lines[0]) 

960 ret.extend(lines[1:-1]) 

961 remaining = lines[-1] 

962 elif len(lines) == 1: 

963 if remaining is None: 

964 remaining = lines.pop() 

965 else: 

966 remaining += lines.pop() 

967 if remaining is not None: 

968 ret.append(remaining) 

969 return ret 

970 

971 

972def _parse_message( 

973 chunks: Iterable[bytes], 

974) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]: 

975 """Parse a message with a list of fields and a body. 

976 

977 Args: 

978 chunks: the raw chunks of the tag or commit object. 

979 Returns: iterator of tuples of (field, value), one per header line, in the 

980 order read from the text, possibly including duplicates. Includes a 

981 field named None for the freeform tag/commit text. 

982 """ 

983 f = BytesIO(b"".join(chunks)) 

984 k = None 

985 v = b"" 

986 eof = False 

987 

988 def _strip_last_newline(value: bytes) -> bytes: 

989 """Strip the last newline from value.""" 

990 if value and value.endswith(b"\n"): 

991 return value[:-1] 

992 return value 

993 

994 # Parse the headers 

995 # 

996 # Headers can contain newlines. The next line is indented with a space. 

997 # We store the latest key as 'k', and the accumulated value as 'v'. 

998 for line in f: 

999 if line.startswith(b" "): 

1000 # Indented continuation of the previous line 

1001 v += line[1:] 

1002 else: 

1003 if k is not None: 

1004 # We parsed a new header, return its value 

1005 yield (k, _strip_last_newline(v)) 

1006 if line == b"\n": 

1007 # Empty line indicates end of headers 

1008 break 

1009 (k, v) = line.split(b" ", 1) 

1010 

1011 else: 

1012 # We reached end of file before the headers ended. We still need to 

1013 # return the previous header, then we need to return a None field for 

1014 # the text. 

1015 eof = True 

1016 if k is not None: 

1017 yield (k, _strip_last_newline(v)) 

1018 yield (None, None) 

1019 

1020 if not eof: 

1021 # We didn't reach the end of file while parsing headers. We can return 

1022 # the rest of the file as a message. 

1023 yield (None, f.read()) 

1024 

1025 f.close() 

1026 

1027 

1028def _format_message( 

1029 headers: Sequence[tuple[bytes, bytes]], body: bytes | None 

1030) -> Iterator[bytes]: 

1031 for field, value in headers: 

1032 lines = value.split(b"\n") 

1033 yield git_line(field, lines[0]) 

1034 for line in lines[1:]: 

1035 yield b" " + line + b"\n" 

1036 yield b"\n" # There must be a new line after the headers 

1037 if body: 

1038 yield body 

1039 

1040 

1041class Tag(ShaFile): 

1042 """A Git Tag object.""" 

1043 

1044 type_name = b"tag" 

1045 type_num = 4 

1046 

1047 __slots__ = ( 

1048 "_message", 

1049 "_name", 

1050 "_object_class", 

1051 "_object_sha", 

1052 "_signature", 

1053 "_tag_time", 

1054 "_tag_timezone", 

1055 "_tag_timezone_neg_utc", 

1056 "_tagger", 

1057 ) 

1058 

1059 _message: bytes | None 

1060 _name: bytes | None 

1061 _object_class: "type[ShaFile] | None" 

1062 _object_sha: bytes | None 

1063 _signature: bytes | None 

1064 _tag_time: int | None 

1065 _tag_timezone: int | None 

1066 _tag_timezone_neg_utc: bool | None 

1067 _tagger: bytes | None 

1068 

1069 def __init__(self) -> None: 

1070 """Initialize a new Tag object.""" 

1071 super().__init__() 

1072 self._tagger = None 

1073 self._tag_time = None 

1074 self._tag_timezone = None 

1075 self._tag_timezone_neg_utc = False 

1076 self._signature: bytes | None = None 

1077 

1078 @classmethod 

1079 def from_path( 

1080 cls, 

1081 filename: str | bytes, 

1082 sha: ObjectID | None = None, 

1083 *, 

1084 object_format: ObjectFormat | None = None, 

1085 ) -> "Tag": 

1086 """Read a tag from a file on disk. 

1087 

1088 Args: 

1089 filename: Path to the tag file 

1090 sha: Optional known SHA for the object 

1091 object_format: Optional object format to use 

1092 

1093 Returns: 

1094 A Tag object 

1095 

1096 Raises: 

1097 NotTagError: If the file is not a tag 

1098 """ 

1099 tag = ShaFile.from_path(filename, sha, object_format=object_format) 

1100 if not isinstance(tag, cls): 

1101 raise NotTagError(_path_to_bytes(filename)) 

1102 return tag 

1103 

1104 def check(self) -> None: 

1105 """Check this object for internal consistency. 

1106 

1107 Raises: 

1108 ObjectFormatException: if the object is malformed in some way 

1109 """ 

1110 super().check() 

1111 assert self._chunked_text is not None 

1112 self._check_has_member("_object_sha", "missing object sha") 

1113 self._check_has_member("_object_class", "missing object type") 

1114 self._check_has_member("_name", "missing tag name") 

1115 

1116 if not self._name: 

1117 raise ObjectFormatException("empty tag name") 

1118 

1119 if self._object_sha is None: 

1120 raise ObjectFormatException("missing object sha") 

1121 check_hexsha(self._object_sha, "invalid object sha") 

1122 

1123 if self._tagger is not None: 

1124 check_identity(self._tagger, "invalid tagger") 

1125 

1126 self._check_has_member("_tag_time", "missing tag time") 

1127 if self._tag_time is None: 

1128 raise ObjectFormatException("missing tag time") 

1129 check_time(self._tag_time) 

1130 

1131 last = None 

1132 for field, _ in _parse_message(self._chunked_text): 

1133 if field == _OBJECT_HEADER and last is not None: 

1134 raise ObjectFormatException("unexpected object") 

1135 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

1136 raise ObjectFormatException("unexpected type") 

1137 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

1138 raise ObjectFormatException("unexpected tag name") 

1139 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

1140 raise ObjectFormatException("unexpected tagger") 

1141 last = field 

1142 

1143 def _serialize(self) -> list[bytes]: 

1144 headers = [] 

1145 if self._object_sha is None: 

1146 raise ObjectFormatException("missing object sha") 

1147 headers.append((_OBJECT_HEADER, self._object_sha)) 

1148 if self._object_class is None: 

1149 raise ObjectFormatException("missing object class") 

1150 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

1151 if self._name is None: 

1152 raise ObjectFormatException("missing tag name") 

1153 headers.append((_TAG_HEADER, self._name)) 

1154 if self._tagger: 

1155 if self._tag_time is None: 

1156 headers.append((_TAGGER_HEADER, self._tagger)) 

1157 else: 

1158 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

1159 raise ObjectFormatException("missing timezone info") 

1160 headers.append( 

1161 ( 

1162 _TAGGER_HEADER, 

1163 format_time_entry( 

1164 self._tagger, 

1165 self._tag_time, 

1166 (self._tag_timezone, self._tag_timezone_neg_utc), 

1167 ), 

1168 ) 

1169 ) 

1170 

1171 if self.message is None and self._signature is None: 

1172 body = None 

1173 else: 

1174 body = (self.message or b"") + (self._signature or b"") 

1175 return list(_format_message(headers, body)) 

1176 

1177 def _deserialize(self, chunks: list[bytes]) -> None: 

1178 """Grab the metadata attached to the tag.""" 

1179 self._tagger = None 

1180 self._tag_time = None 

1181 self._tag_timezone = None 

1182 self._tag_timezone_neg_utc = False 

1183 for field, value in _parse_message(chunks): 

1184 if field == _OBJECT_HEADER: 

1185 self._object_sha = value 

1186 elif field == _TYPE_HEADER: 

1187 assert isinstance(value, bytes) 

1188 obj_class = object_class(value) 

1189 if not obj_class: 

1190 raise ObjectFormatException(f"Not a known type: {value!r}") 

1191 self._object_class = obj_class 

1192 elif field == _TAG_HEADER: 

1193 self._name = value 

1194 elif field == _TAGGER_HEADER: 

1195 if value is None: 

1196 raise ObjectFormatException("missing tagger value") 

1197 ( 

1198 self._tagger, 

1199 self._tag_time, 

1200 (self._tag_timezone, self._tag_timezone_neg_utc), 

1201 ) = parse_time_entry(value) 

1202 elif field is None: 

1203 if value is None: 

1204 self._message = None 

1205 self._signature = None 

1206 else: 

1207 # Try to find either PGP or SSH signature 

1208 sig_idx = None 

1209 try: 

1210 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

1211 except ValueError: 

1212 try: 

1213 sig_idx = value.index(BEGIN_SSH_SIGNATURE) 

1214 except ValueError: 

1215 pass 

1216 

1217 if sig_idx is not None: 

1218 self._message = value[:sig_idx] 

1219 self._signature = value[sig_idx:] 

1220 else: 

1221 self._message = value 

1222 self._signature = None 

1223 else: 

1224 raise ObjectFormatException( 

1225 f"Unknown field {field.decode('ascii', 'replace')}" 

1226 ) 

1227 

1228 @property 

1229 def object(self) -> tuple[type[ShaFile], ObjectID]: 

1230 """Get the object pointed to by this tag. 

1231 

1232 Returns: tuple of (object class, sha). 

1233 """ 

1234 if self._object_class is None or self._object_sha is None: 

1235 raise ValueError("Tag object is not properly initialized") 

1236 return (self._object_class, ObjectID(self._object_sha)) 

1237 

1238 @object.setter 

1239 def object(self, value: tuple[type[ShaFile], bytes]) -> None: 

1240 self._object_class, self._object_sha = value 

1241 self._needs_serialization = True 

1242 

1243 name = serializable_property("name", "The name of this tag") 

1244 tagger = serializable_property( 

1245 "tagger", "Returns the name of the person who created this tag" 

1246 ) 

1247 tag_time = serializable_property( 

1248 "tag_time", 

1249 "The creation timestamp of the tag. As the number of seconds since the epoch", 

1250 ) 

1251 tag_timezone = serializable_property( 

1252 "tag_timezone", "The timezone that tag_time is in." 

1253 ) 

1254 message = serializable_property("message", "the message attached to this tag") 

1255 

1256 signature = serializable_property("signature", "Optional detached GPG signature") 

1257 

1258 def raw_without_sig(self) -> bytes: 

1259 """Return raw string serialization without the GPG/SSH signature. 

1260 

1261 self.signature is a signature for the returned raw byte string serialization. 

1262 """ 

1263 ret = self.as_raw_string() 

1264 if self._signature: 

1265 ret = ret[: -len(self._signature)] 

1266 return ret 

1267 

1268 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

1269 """Extract the payload, signature, and signature type from this tag. 

1270 

1271 Returns: 

1272 tuple of (``payload``, ``signature``, ``signature_type``) where: 

1273 

1274 - ``payload``: The raw tag data without the signature 

1275 - ``signature``: The signature bytes if present, None otherwise 

1276 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

1277 

1278 Raises: 

1279 ObjectFormatException: If signature has unknown format 

1280 """ 

1281 if self._signature is None: 

1282 return self.as_raw_string(), None, None 

1283 

1284 payload = self.raw_without_sig() 

1285 

1286 # Determine signature type 

1287 if self._signature.startswith(BEGIN_PGP_SIGNATURE): 

1288 sig_type = SIGNATURE_PGP 

1289 elif self._signature.startswith(BEGIN_SSH_SIGNATURE): 

1290 sig_type = SIGNATURE_SSH 

1291 else: 

1292 raise ObjectFormatException("Unknown signature format") 

1293 

1294 return payload, self._signature, sig_type 

1295 

1296 

1297class TreeEntry(NamedTuple): 

1298 """Named tuple encapsulating a single tree entry.""" 

1299 

1300 path: bytes 

1301 mode: int 

1302 sha: ObjectID 

1303 

1304 def in_path(self, path: bytes) -> "TreeEntry": 

1305 """Return a copy of this entry with the given path prepended.""" 

1306 if not isinstance(self.path, bytes): 

1307 raise TypeError(f"Expected bytes for path, got {path!r}") 

1308 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1309 

1310 

1311def parse_tree( 

1312 text: bytes, sha_len: int | None = None, *, strict: bool = False 

1313) -> Iterator[tuple[bytes, int, bytes]]: 

1314 """Parse a tree text. 

1315 

1316 Args: 

1317 text: Serialized text to parse 

1318 sha_len: Length of the object IDs in bytes 

1319 strict: Whether to be strict about format 

1320 Returns: iterator of tuples of (name, mode, sha) 

1321 

1322 Raises: 

1323 ObjectFormatException: if the object was malformed in some way 

1324 """ 

1325 count = 0 

1326 length = len(text) 

1327 

1328 while count < length: 

1329 mode_end = text.index(b" ", count) 

1330 mode_text = text[count:mode_end] 

1331 if strict and mode_text.startswith(b"0"): 

1332 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1333 try: 

1334 mode = int(mode_text, 8) 

1335 except ValueError as exc: 

1336 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1337 name_end = text.index(b"\0", mode_end) 

1338 name = text[mode_end + 1 : name_end] 

1339 

1340 if sha_len is None: 

1341 raise ObjectFormatException("sha_len must be specified") 

1342 count = name_end + 1 + sha_len 

1343 if count > length: 

1344 raise ObjectFormatException( 

1345 f"Tree entry extends beyond tree length: {count} > {length}" 

1346 ) 

1347 

1348 sha = text[name_end + 1 : count] 

1349 if len(sha) != sha_len: 

1350 raise ObjectFormatException( 

1351 f"Sha has invalid length: {len(sha)} != {sha_len}" 

1352 ) 

1353 hexsha = sha_to_hex(RawObjectID(sha)) 

1354 yield (name, mode, hexsha) 

1355 

1356 

1357def serialize_tree(items: Iterable[tuple[bytes, int, ObjectID]]) -> Iterator[bytes]: 

1358 """Serialize the items in a tree to a text. 

1359 

1360 Args: 

1361 items: Sorted iterable over (name, mode, sha) tuples 

1362 Returns: Serialized tree text as chunks 

1363 """ 

1364 for name, mode, hexsha in items: 

1365 yield ( 

1366 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1367 ) 

1368 

1369 

1370def sorted_tree_items( 

1371 entries: dict[bytes, tuple[int, ObjectID]], name_order: bool 

1372) -> Iterator[TreeEntry]: 

1373 """Iterate over a tree entries dictionary. 

1374 

1375 Args: 

1376 name_order: If True, iterate entries in order of their name. If 

1377 False, iterate entries in tree order, that is, treat subtree entries as 

1378 having '/' appended. 

1379 entries: Dictionary mapping names to (mode, sha) tuples 

1380 Returns: Iterator over (name, mode, hexsha) 

1381 """ 

1382 if name_order: 

1383 key_func = key_entry_name_order 

1384 else: 

1385 key_func = key_entry 

1386 for name, entry in sorted(entries.items(), key=key_func): 

1387 mode, hexsha = entry 

1388 # Stricter type checks than normal to mirror checks in the Rust version. 

1389 mode = int(mode) 

1390 if not isinstance(hexsha, bytes): 

1391 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1392 yield TreeEntry(name, mode, hexsha) 

1393 

1394 

1395def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1396 """Sort key for tree entry. 

1397 

1398 Args: 

1399 entry: (name, value) tuple 

1400 """ 

1401 (name, (mode, _sha)) = entry 

1402 if stat.S_ISDIR(mode): 

1403 name += b"/" 

1404 return name 

1405 

1406 

1407def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1408 """Sort key for tree entry in name order.""" 

1409 return entry[0] 

1410 

1411 

1412def pretty_format_tree_entry( 

1413 name: bytes, mode: int, hexsha: ObjectID, encoding: str = "utf-8" 

1414) -> str: 

1415 """Pretty format tree entry. 

1416 

1417 Args: 

1418 name: Name of the directory entry 

1419 mode: Mode of entry 

1420 hexsha: Hexsha of the referenced object 

1421 encoding: Character encoding for the name 

1422 Returns: string describing the tree entry 

1423 """ 

1424 if mode & stat.S_IFDIR: 

1425 kind = "tree" 

1426 else: 

1427 kind = "blob" 

1428 return "{:04o} {} {}\t{}\n".format( 

1429 mode, 

1430 kind, 

1431 hexsha.decode("ascii"), 

1432 name.decode(encoding, "replace"), 

1433 ) 

1434 

1435 

1436class SubmoduleEncountered(Exception): 

1437 """A submodule was encountered while resolving a path.""" 

1438 

1439 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1440 """Initialize SubmoduleEncountered exception. 

1441 

1442 Args: 

1443 path: Path where the submodule was encountered 

1444 sha: SHA of the submodule 

1445 """ 

1446 self.path = path 

1447 self.sha = sha 

1448 

1449 

1450class Tree(ShaFile): 

1451 """A Git tree object.""" 

1452 

1453 type_name = b"tree" 

1454 type_num = 2 

1455 

1456 __slots__ = "_entries" 

1457 

1458 def __init__(self) -> None: 

1459 """Initialize an empty Tree.""" 

1460 super().__init__() 

1461 self._entries: dict[bytes, tuple[int, ObjectID]] = {} 

1462 

1463 @classmethod 

1464 def from_path( 

1465 cls, 

1466 filename: str | bytes, 

1467 sha: ObjectID | None = None, 

1468 *, 

1469 object_format: ObjectFormat | None = None, 

1470 ) -> "Tree": 

1471 """Read a tree from a file on disk. 

1472 

1473 Args: 

1474 filename: Path to the tree file 

1475 sha: Optional known SHA for the object 

1476 object_format: Optional object format to use 

1477 

1478 Returns: 

1479 A Tree object 

1480 

1481 Raises: 

1482 NotTreeError: If the file is not a tree 

1483 """ 

1484 tree = ShaFile.from_path(filename, sha, object_format=object_format) 

1485 if not isinstance(tree, cls): 

1486 raise NotTreeError(_path_to_bytes(filename)) 

1487 return tree 

1488 

1489 def __contains__(self, name: bytes) -> bool: 

1490 """Check if name exists in tree.""" 

1491 return name in self._entries 

1492 

1493 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1494 """Get tree entry by name.""" 

1495 return self._entries[name] 

1496 

1497 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1498 """Set a tree entry by name. 

1499 

1500 Args: 

1501 name: The name of the entry, as a string. 

1502 value: A tuple of (mode, hexsha), where mode is the mode of the 

1503 entry as an integral type and hexsha is the hex SHA of the entry as 

1504 a string. 

1505 """ 

1506 mode, hexsha = value 

1507 self._entries[name] = (mode, hexsha) 

1508 self._needs_serialization = True 

1509 

1510 def __delitem__(self, name: bytes) -> None: 

1511 """Delete tree entry by name.""" 

1512 del self._entries[name] 

1513 self._needs_serialization = True 

1514 

1515 def __len__(self) -> int: 

1516 """Return number of entries in tree.""" 

1517 return len(self._entries) 

1518 

1519 def __iter__(self) -> Iterator[bytes]: 

1520 """Iterate over tree entry names.""" 

1521 return iter(self._entries) 

1522 

1523 def add(self, name: bytes, mode: int, hexsha: ObjectID) -> None: 

1524 """Add an entry to the tree. 

1525 

1526 Args: 

1527 mode: The mode of the entry as an integral type. Not all 

1528 possible modes are supported by git; see check() for details. 

1529 name: The name of the entry, as a string. 

1530 hexsha: The hex SHA of the entry as a string. 

1531 """ 

1532 self._entries[name] = mode, hexsha 

1533 self._needs_serialization = True 

1534 

1535 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1536 """Iterate over entries. 

1537 

1538 Args: 

1539 name_order: If True, iterate in name order instead of tree 

1540 order. 

1541 Returns: Iterator over (name, mode, sha) tuples 

1542 """ 

1543 return sorted_tree_items(self._entries, name_order) 

1544 

1545 def items(self) -> list[TreeEntry]: 

1546 """Return the sorted entries in this tree. 

1547 

1548 Returns: List with (name, mode, sha) tuples 

1549 """ 

1550 return list(self.iteritems()) 

1551 

1552 def _deserialize(self, chunks: list[bytes]) -> None: 

1553 """Grab the entries in the tree.""" 

1554 try: 

1555 parsed_entries = parse_tree( 

1556 b"".join(chunks), 

1557 sha_len=self.object_format.oid_length, 

1558 ) 

1559 except ValueError as exc: 

1560 raise ObjectFormatException(exc) from exc 

1561 # TODO: list comprehension is for efficiency in the common (small) 

1562 # case; if memory efficiency in the large case is a concern, use a 

1563 # genexp. 

1564 self._entries = {n: (m, ObjectID(s)) for n, m, s in parsed_entries} 

1565 

1566 def check(self) -> None: 

1567 """Check this object for internal consistency. 

1568 

1569 Raises: 

1570 ObjectFormatException: if the object is malformed in some way 

1571 """ 

1572 super().check() 

1573 assert self._chunked_text is not None 

1574 last = None 

1575 allowed_modes = ( 

1576 stat.S_IFREG | 0o755, 

1577 stat.S_IFREG | 0o644, 

1578 stat.S_IFLNK, 

1579 stat.S_IFDIR, 

1580 S_IFGITLINK, 

1581 # TODO: optionally exclude as in git fsck --strict 

1582 stat.S_IFREG | 0o664, 

1583 ) 

1584 for name, mode, sha in parse_tree( 

1585 b"".join(self._chunked_text), 

1586 strict=True, 

1587 sha_len=self.object_format.oid_length, 

1588 ): 

1589 check_hexsha(sha, f"invalid sha {sha!r}") 

1590 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1591 raise ObjectFormatException( 

1592 "invalid name {}".format(name.decode("utf-8", "replace")) 

1593 ) 

1594 

1595 if mode not in allowed_modes: 

1596 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1597 

1598 entry = (name, (mode, ObjectID(sha))) 

1599 if last: 

1600 if key_entry(last) > key_entry(entry): 

1601 raise ObjectFormatException("entries not sorted") 

1602 if name == last[0]: 

1603 raise ObjectFormatException(f"duplicate entry {name!r}") 

1604 last = entry 

1605 

1606 def _serialize(self) -> list[bytes]: 

1607 return list(serialize_tree(self.iteritems())) 

1608 

1609 def as_pretty_string(self) -> str: 

1610 """Return a human-readable string representation of this tree. 

1611 

1612 Returns: 

1613 Pretty-printed tree entries 

1614 """ 

1615 text: list[str] = [] 

1616 for entry in self.iteritems(): 

1617 if ( 

1618 entry.path is not None 

1619 and entry.mode is not None 

1620 and entry.sha is not None 

1621 ): 

1622 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha)) 

1623 return "".join(text) 

1624 

1625 def lookup_path( 

1626 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1627 ) -> tuple[int, ObjectID]: 

1628 """Look up an object in a Git tree. 

1629 

1630 Args: 

1631 lookup_obj: Callback for retrieving object by SHA1 

1632 path: Path to lookup 

1633 Returns: A tuple of (mode, SHA) of the resulting path. 

1634 """ 

1635 # Handle empty path - return the tree itself 

1636 if not path: 

1637 return stat.S_IFDIR, self.id 

1638 

1639 parts = path.split(b"/") 

1640 sha = self.id 

1641 mode: int | None = None 

1642 for i, p in enumerate(parts): 

1643 if not p: 

1644 continue 

1645 if mode is not None and S_ISGITLINK(mode): 

1646 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1647 obj = lookup_obj(sha) 

1648 if not isinstance(obj, Tree): 

1649 raise NotTreeError(sha) 

1650 mode, sha = obj[p] 

1651 if mode is None: 

1652 raise ValueError("No valid path found") 

1653 return mode, sha 

1654 

1655 

1656def parse_timezone(text: bytes) -> tuple[int, bool]: 

1657 """Parse a timezone text fragment (e.g. '+0100'). 

1658 

1659 Args: 

1660 text: Text to parse. 

1661 Returns: Tuple with timezone as seconds difference to UTC 

1662 and a boolean indicating whether this was a UTC timezone 

1663 prefixed with a negative sign (-0000). 

1664 """ 

1665 # cgit parses the first character as the sign, and the rest 

1666 # as an integer (using strtol), which could also be negative. 

1667 # We do the same for compatibility. See #697828. 

1668 if text[0] not in b"+-": 

1669 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1670 sign = text[:1] 

1671 offset = int(text[1:]) 

1672 if sign == b"-": 

1673 offset = -offset 

1674 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1675 signum = ((offset < 0) and -1) or 1 

1676 offset = abs(offset) 

1677 hours = int(offset / 100) 

1678 minutes = offset % 100 

1679 return ( 

1680 signum * (hours * 3600 + minutes * 60), 

1681 unnecessary_negative_timezone, 

1682 ) 

1683 

1684 

1685def parse_timezone_broken(text: bytes) -> tuple[int, bool]: 

1686 """Parse a timezone text fragment, accepting broken formats. 

1687 

1688 This function handles various broken timezone formats found in the wild: 

1689 - Missing sign prefix (e.g., '0000' instead of '+0000') 

1690 - Double negative (e.g., '--700') 

1691 

1692 Args: 

1693 text: Text to parse. 

1694 Returns: Tuple with timezone as seconds difference to UTC 

1695 and a boolean indicating whether this was a UTC timezone 

1696 prefixed with a negative sign (-0000). 

1697 """ 

1698 if text[0] not in b"+-": 

1699 # Some (broken) commits do not have a sign 

1700 text = b"+" + text 

1701 

1702 # cgit parses the first character as the sign, and the rest 

1703 # as an integer (using strtol), which could also be negative. 

1704 # We do the same for compatibility. See #697828. 

1705 sign = text[:1] 

1706 offset = int(text[1:]) 

1707 if sign == b"-": 

1708 offset = -offset 

1709 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1710 signum = ((offset < 0) and -1) or 1 

1711 offset = abs(offset) 

1712 hours = int(offset / 100) 

1713 minutes = offset % 100 

1714 return ( 

1715 signum * (hours * 3600 + minutes * 60), 

1716 unnecessary_negative_timezone, 

1717 ) 

1718 

1719 

1720def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1721 """Format a timezone for Git serialization. 

1722 

1723 Args: 

1724 offset: Timezone offset as seconds difference to UTC 

1725 unnecessary_negative_timezone: Whether to use a minus sign for 

1726 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1727 """ 

1728 if offset % 60 != 0: 

1729 raise ValueError("Unable to handle non-minute offset.") 

1730 if offset < 0 or unnecessary_negative_timezone: 

1731 sign = "-" 

1732 offset = -offset 

1733 else: 

1734 sign = "+" 

1735 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1736 

1737 

1738def parse_time_entry( 

1739 value: bytes, 

1740) -> tuple[bytes, int | None, tuple[int | None, bool]]: 

1741 """Parse event. 

1742 

1743 Args: 

1744 value: Bytes representing a git commit/tag line 

1745 Raises: 

1746 ObjectFormatException in case of parsing error (malformed 

1747 field date) 

1748 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1749 """ 

1750 try: 

1751 sep = value.rindex(b"> ") 

1752 except ValueError: 

1753 return (value, None, (None, False)) 

1754 try: 

1755 person = value[0 : sep + 1] 

1756 rest = value[sep + 2 :] 

1757 timetext, timezonetext = rest.rsplit(b" ", 1) 

1758 time = int(timetext) 

1759 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1760 except ValueError as exc: 

1761 raise ObjectFormatException(exc) from exc 

1762 return person, time, (timezone, timezone_neg_utc) 

1763 

1764 

1765def parse_time_entry_broken( 

1766 value: bytes, 

1767) -> tuple[bytes, int | None, tuple[int | None, bool]]: 

1768 """Parse event, accepting broken formats. 

1769 

1770 This function handles various broken author/committer/tagger line formats: 

1771 - Missing angle brackets around email 

1772 - Unsigned timezones 

1773 - Double-negative timezones 

1774 

1775 Args: 

1776 value: Bytes representing a git commit/tag line 

1777 Raises: 

1778 ObjectFormatException in case of parsing error 

1779 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1780 """ 

1781 m = _TIME_ENTRY_RE.match(value) 

1782 if not m: 

1783 raise ObjectFormatException(f"Unable to parse time entry: {value!r}") 

1784 

1785 person = m.group("person") 

1786 timetext = m.group("time") 

1787 timezonetext = m.group("timezone") 

1788 time = int(timetext) 

1789 timezone, timezone_neg_utc = parse_timezone_broken(timezonetext) 

1790 

1791 return person, time, (timezone, timezone_neg_utc) 

1792 

1793 

1794def format_time_entry( 

1795 person: bytes, time: int, timezone_info: tuple[int, bool] 

1796) -> bytes: 

1797 """Format an event.""" 

1798 (timezone, timezone_neg_utc) = timezone_info 

1799 return b" ".join( 

1800 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1801 ) 

1802 

1803 

1804def _parse_commit( 

1805 chunks: Iterable[bytes], 

1806) -> tuple[ 

1807 bytes | None, 

1808 list[bytes], 

1809 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1810 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1811 bytes | None, 

1812 list[Tag], 

1813 bytes | None, 

1814 bytes | None, 

1815 list[tuple[bytes, bytes]], 

1816]: 

1817 """Parse a commit object from chunks. 

1818 

1819 Args: 

1820 chunks: Chunks to parse 

1821 Returns: Tuple of (tree, parents, author_info, commit_info, 

1822 encoding, mergetag, gpgsig, message, extra) 

1823 """ 

1824 parents = [] 

1825 extra: list[tuple[bytes, bytes]] = [] 

1826 tree = None 

1827 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1828 None, 

1829 None, 

1830 (None, None), 

1831 ) 

1832 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1833 None, 

1834 None, 

1835 (None, None), 

1836 ) 

1837 encoding = None 

1838 mergetag = [] 

1839 message = None 

1840 gpgsig = None 

1841 

1842 for field, value in _parse_message(chunks): 

1843 # TODO(jelmer): Enforce ordering 

1844 if field == _TREE_HEADER: 

1845 tree = value 

1846 elif field == _PARENT_HEADER: 

1847 if value is None: 

1848 raise ObjectFormatException("missing parent value") 

1849 parents.append(value) 

1850 elif field == _AUTHOR_HEADER: 

1851 if value is None: 

1852 raise ObjectFormatException("missing author value") 

1853 author_info = parse_time_entry(value) 

1854 elif field == _COMMITTER_HEADER: 

1855 if value is None: 

1856 raise ObjectFormatException("missing committer value") 

1857 commit_info = parse_time_entry(value) 

1858 elif field == _ENCODING_HEADER: 

1859 encoding = value 

1860 elif field == _MERGETAG_HEADER: 

1861 if value is None: 

1862 raise ObjectFormatException("missing mergetag value") 

1863 tag = Tag.from_string(value + b"\n") 

1864 assert isinstance(tag, Tag) 

1865 mergetag.append(tag) 

1866 elif field == _GPGSIG_HEADER: 

1867 gpgsig = value 

1868 elif field is None: 

1869 message = value 

1870 else: 

1871 if value is None: 

1872 raise ObjectFormatException(f"missing value for field {field!r}") 

1873 extra.append((field, value)) 

1874 return ( 

1875 tree, 

1876 parents, 

1877 author_info, 

1878 commit_info, 

1879 encoding, 

1880 mergetag, 

1881 gpgsig, 

1882 message, 

1883 extra, 

1884 ) 

1885 

1886 

1887def _parse_commit_broken( 

1888 chunks: Iterable[bytes], 

1889) -> tuple[ 

1890 bytes | None, 

1891 list[bytes], 

1892 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1893 tuple[bytes | None, int | None, tuple[int | None, bool | None]], 

1894 bytes | None, 

1895 list[Tag], 

1896 bytes | None, 

1897 bytes | None, 

1898 list[tuple[bytes, bytes]], 

1899]: 

1900 """Parse a commit object from chunks, accepting broken formats. 

1901 

1902 This function handles various broken author/committer line formats: 

1903 - Missing angle brackets around email 

1904 - Unsigned timezones 

1905 - Double-negative timezones 

1906 

1907 Args: 

1908 chunks: Chunks to parse 

1909 Returns: Tuple of (tree, parents, author_info, commit_info, 

1910 encoding, mergetag, gpgsig, message, extra) 

1911 """ 

1912 parents = [] 

1913 extra: list[tuple[bytes, bytes]] = [] 

1914 tree = None 

1915 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1916 None, 

1917 None, 

1918 (None, None), 

1919 ) 

1920 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = ( 

1921 None, 

1922 None, 

1923 (None, None), 

1924 ) 

1925 encoding = None 

1926 mergetag = [] 

1927 message = None 

1928 gpgsig = None 

1929 

1930 for field, value in _parse_message(chunks): 

1931 # TODO(jelmer): Enforce ordering 

1932 if field == _TREE_HEADER: 

1933 tree = value 

1934 elif field == _PARENT_HEADER: 

1935 if value is None: 

1936 raise ObjectFormatException("missing parent value") 

1937 parents.append(value) 

1938 elif field == _AUTHOR_HEADER: 

1939 if value is None: 

1940 raise ObjectFormatException("missing author value") 

1941 author_info = parse_time_entry_broken(value) 

1942 elif field == _COMMITTER_HEADER: 

1943 if value is None: 

1944 raise ObjectFormatException("missing committer value") 

1945 commit_info = parse_time_entry_broken(value) 

1946 elif field == _ENCODING_HEADER: 

1947 encoding = value 

1948 elif field == _MERGETAG_HEADER: 

1949 if value is None: 

1950 raise ObjectFormatException("missing mergetag value") 

1951 tag = Tag.from_string(value + b"\n") 

1952 assert isinstance(tag, Tag) 

1953 mergetag.append(tag) 

1954 elif field == _GPGSIG_HEADER: 

1955 gpgsig = value 

1956 elif field is None: 

1957 message = value 

1958 else: 

1959 if value is None: 

1960 raise ObjectFormatException(f"missing value for field {field!r}") 

1961 extra.append((field, value)) 

1962 return ( 

1963 tree, 

1964 parents, 

1965 author_info, 

1966 commit_info, 

1967 encoding, 

1968 mergetag, 

1969 gpgsig, 

1970 message, 

1971 extra, 

1972 ) 

1973 

1974 

1975class Commit(ShaFile): 

1976 """A git commit object.""" 

1977 

1978 type_name = b"commit" 

1979 type_num = 1 

1980 

1981 __slots__ = ( 

1982 "_author", 

1983 "_author_time", 

1984 "_author_timezone", 

1985 "_author_timezone_neg_utc", 

1986 "_commit_time", 

1987 "_commit_timezone", 

1988 "_commit_timezone_neg_utc", 

1989 "_committer", 

1990 "_encoding", 

1991 "_extra", 

1992 "_gpgsig", 

1993 "_mergetag", 

1994 "_message", 

1995 "_parents", 

1996 "_tree", 

1997 ) 

1998 

1999 def __init__(self) -> None: 

2000 """Initialize an empty Commit.""" 

2001 super().__init__() 

2002 self._parents: list[ObjectID] = [] 

2003 self._encoding: bytes | None = None 

2004 self._mergetag: list[Tag] = [] 

2005 self._gpgsig: bytes | None = None 

2006 self._extra: list[tuple[bytes, bytes]] = [] 

2007 self._author_timezone_neg_utc: bool | None = False 

2008 self._commit_timezone_neg_utc: bool | None = False 

2009 

2010 @classmethod 

2011 def from_path( 

2012 cls, 

2013 path: str | bytes, 

2014 sha: ObjectID | None = None, 

2015 *, 

2016 object_format: ObjectFormat | None = None, 

2017 ) -> "Commit": 

2018 """Read a commit from a file on disk. 

2019 

2020 Args: 

2021 path: Path to the commit file 

2022 sha: Optional known SHA for the object 

2023 object_format: Optional object format to use 

2024 

2025 Returns: 

2026 A Commit object 

2027 

2028 Raises: 

2029 NotCommitError: If the file is not a commit 

2030 """ 

2031 commit = ShaFile.from_path(path, sha, object_format=object_format) 

2032 if not isinstance(commit, cls): 

2033 raise NotCommitError(_path_to_bytes(path)) 

2034 return commit 

2035 

2036 def _deserialize(self, chunks: list[bytes]) -> None: 

2037 ( 

2038 tree, 

2039 parents, 

2040 author_info, 

2041 commit_info, 

2042 encoding, 

2043 mergetag, 

2044 gpgsig, 

2045 message, 

2046 extra, 

2047 ) = _parse_commit(chunks) 

2048 

2049 self._tree = tree 

2050 self._parents = [ObjectID(p) for p in parents] 

2051 self._encoding = encoding 

2052 self._mergetag = mergetag 

2053 self._gpgsig = gpgsig 

2054 self._message = message 

2055 self._extra = extra 

2056 

2057 ( 

2058 self._author, 

2059 self._author_time, 

2060 (self._author_timezone, self._author_timezone_neg_utc), 

2061 ) = author_info 

2062 ( 

2063 self._committer, 

2064 self._commit_time, 

2065 (self._commit_timezone, self._commit_timezone_neg_utc), 

2066 ) = commit_info 

2067 

2068 def check(self) -> None: 

2069 """Check this object for internal consistency. 

2070 

2071 Raises: 

2072 ObjectFormatException: if the object is malformed in some way 

2073 """ 

2074 super().check() 

2075 assert self._chunked_text is not None 

2076 self._check_has_member("_tree", "missing tree") 

2077 self._check_has_member("_author", "missing author") 

2078 self._check_has_member("_committer", "missing committer") 

2079 self._check_has_member("_author_time", "missing author time") 

2080 self._check_has_member("_commit_time", "missing commit time") 

2081 

2082 for parent in self._parents: 

2083 check_hexsha(parent, "invalid parent sha") 

2084 assert self._tree is not None # checked by _check_has_member above 

2085 check_hexsha(self._tree, "invalid tree sha") 

2086 

2087 assert self._author is not None # checked by _check_has_member above 

2088 assert self._committer is not None # checked by _check_has_member above 

2089 check_identity(self._author, "invalid author") 

2090 check_identity(self._committer, "invalid committer") 

2091 

2092 assert self._author_time is not None # checked by _check_has_member above 

2093 assert self._commit_time is not None # checked by _check_has_member above 

2094 check_time(self._author_time) 

2095 check_time(self._commit_time) 

2096 

2097 last = None 

2098 for field, _ in _parse_message(self._chunked_text): 

2099 if field == _TREE_HEADER and last is not None: 

2100 raise ObjectFormatException("unexpected tree") 

2101 elif field == _PARENT_HEADER and last not in ( 

2102 _PARENT_HEADER, 

2103 _TREE_HEADER, 

2104 ): 

2105 raise ObjectFormatException("unexpected parent") 

2106 elif field == _AUTHOR_HEADER and last not in ( 

2107 _TREE_HEADER, 

2108 _PARENT_HEADER, 

2109 ): 

2110 raise ObjectFormatException("unexpected author") 

2111 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

2112 raise ObjectFormatException("unexpected committer") 

2113 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

2114 raise ObjectFormatException("unexpected encoding") 

2115 last = field 

2116 

2117 # TODO: optionally check for duplicate parents 

2118 

2119 def raw_without_sig(self) -> bytes: 

2120 """Return raw string serialization without the GPG/SSH signature. 

2121 

2122 self.gpgsig is a signature for the returned raw byte string serialization. 

2123 """ 

2124 tmp = self.copy() 

2125 assert isinstance(tmp, Commit) 

2126 tmp._gpgsig = None 

2127 tmp.gpgsig = None 

2128 return tmp.as_raw_string() 

2129 

2130 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]: 

2131 """Extract the payload, signature, and signature type from this commit. 

2132 

2133 Returns: 

2134 tuple of (``payload``, ``signature``, ``signature_type``) where: 

2135 

2136 - ``payload``: The raw commit data without the signature 

2137 - ``signature``: The signature bytes if present, None otherwise 

2138 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature 

2139 

2140 Raises: 

2141 ObjectFormatException: If signature has unknown format 

2142 """ 

2143 if self._gpgsig is None: 

2144 return self.as_raw_string(), None, None 

2145 

2146 payload = self.raw_without_sig() 

2147 

2148 # Determine signature type 

2149 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE): 

2150 sig_type = SIGNATURE_PGP 

2151 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE): 

2152 sig_type = SIGNATURE_SSH 

2153 else: 

2154 raise ObjectFormatException("Unknown signature format") 

2155 

2156 return payload, self._gpgsig, sig_type 

2157 

2158 def _serialize(self) -> list[bytes]: 

2159 headers = [] 

2160 assert self._tree is not None 

2161 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

2162 headers.append((_TREE_HEADER, tree_bytes)) 

2163 for p in self._parents: 

2164 headers.append((_PARENT_HEADER, p)) 

2165 assert self._author is not None 

2166 assert self._author_time is not None 

2167 assert self._author_timezone is not None 

2168 assert self._author_timezone_neg_utc is not None 

2169 headers.append( 

2170 ( 

2171 _AUTHOR_HEADER, 

2172 format_time_entry( 

2173 self._author, 

2174 self._author_time, 

2175 (self._author_timezone, self._author_timezone_neg_utc), 

2176 ), 

2177 ) 

2178 ) 

2179 assert self._committer is not None 

2180 assert self._commit_time is not None 

2181 assert self._commit_timezone is not None 

2182 assert self._commit_timezone_neg_utc is not None 

2183 headers.append( 

2184 ( 

2185 _COMMITTER_HEADER, 

2186 format_time_entry( 

2187 self._committer, 

2188 self._commit_time, 

2189 (self._commit_timezone, self._commit_timezone_neg_utc), 

2190 ), 

2191 ) 

2192 ) 

2193 if self.encoding: 

2194 headers.append((_ENCODING_HEADER, self.encoding)) 

2195 for mergetag in self.mergetag: 

2196 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

2197 headers.extend( 

2198 (field, value) for field, value in self._extra if value is not None 

2199 ) 

2200 if self.gpgsig: 

2201 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

2202 return list(_format_message(headers, self._message)) 

2203 

2204 tree = serializable_property("tree", "Tree that is the state of this commit") 

2205 

2206 @property 

2207 def parents(self) -> list[ObjectID]: 

2208 """Parents of this commit, by their SHA1.""" 

2209 return self._parents 

2210 

2211 @parents.setter 

2212 def parents(self, value: list[ObjectID]) -> None: 

2213 """Set a list of parents of this commit.""" 

2214 self._needs_serialization = True 

2215 self._parents = value 

2216 

2217 author = serializable_property("author", "The name of the author of the commit") 

2218 

2219 committer = serializable_property( 

2220 "committer", "The name of the committer of the commit" 

2221 ) 

2222 

2223 message = serializable_property("message", "The commit message") 

2224 

2225 commit_time = serializable_property( 

2226 "commit_time", 

2227 "The timestamp of the commit. As the number of seconds since the epoch.", 

2228 ) 

2229 

2230 commit_timezone = serializable_property( 

2231 "commit_timezone", "The zone the commit time is in" 

2232 ) 

2233 

2234 author_time = serializable_property( 

2235 "author_time", 

2236 "The timestamp the commit was written. As the number of " 

2237 "seconds since the epoch.", 

2238 ) 

2239 

2240 author_timezone = serializable_property( 

2241 "author_timezone", "Returns the zone the author time is in." 

2242 ) 

2243 

2244 encoding = serializable_property("encoding", "Encoding of the commit message.") 

2245 

2246 mergetag = serializable_property("mergetag", "Associated signed tag.") 

2247 

2248 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

2249 

2250 

2251OBJECT_CLASSES = ( 

2252 Commit, 

2253 Tree, 

2254 Blob, 

2255 Tag, 

2256) 

2257 

2258_TYPE_MAP: dict[bytes | int, type[ShaFile]] = {} 

2259 

2260for cls in OBJECT_CLASSES: 

2261 _TYPE_MAP[cls.type_name] = cls 

2262 _TYPE_MAP[cls.type_num] = cls 

2263 

2264 

2265# Public API functions 

2266 

2267 

2268def parse_commit_broken(data: bytes) -> Commit: 

2269 """Parse a commit with broken author/committer lines. 

2270 

2271 This function handles various broken formats found in the wild: 

2272 - Missing angle brackets around email addresses 

2273 - Unsigned timezones (e.g., "0000" instead of "+0000") 

2274 - Double-negative timezones (e.g., "--700") 

2275 - Negative timestamps 

2276 - Long/short/nonsensical timezone values 

2277 

2278 Warning: Commits parsed with this function may not round-trip correctly 

2279 through serialization, as the broken formatting is normalized during parsing. 

2280 The .check() method will likely fail for commits with malformed identity fields. 

2281 

2282 Args: 

2283 data: Raw commit data as bytes 

2284 

2285 Returns: 

2286 A Commit object with normalized fields 

2287 

2288 Example: 

2289 >>> data = b'''tree d80c186a03f423a81b39df39dc87fd269736ca86 

2290 ... author user@example.com 1234567890 -0500 

2291 ... committer user@example.com 1234567890 -0500 

2292 ... 

2293 ... Commit message 

2294 ... ''' 

2295 >>> commit = parse_commit_broken(data) 

2296 >>> commit.author 

2297 b'user@example.com' 

2298 """ 

2299 commit = Commit() 

2300 ( 

2301 tree, 

2302 parents, 

2303 author_info, 

2304 commit_info, 

2305 encoding, 

2306 mergetag, 

2307 gpgsig, 

2308 message, 

2309 extra, 

2310 ) = _parse_commit_broken([data]) 

2311 

2312 commit._tree = tree 

2313 commit._parents = [ObjectID(p) for p in parents] 

2314 commit._encoding = encoding 

2315 commit._mergetag = mergetag 

2316 commit._gpgsig = gpgsig 

2317 commit._message = message 

2318 commit._extra = extra 

2319 

2320 ( 

2321 commit._author, 

2322 commit._author_time, 

2323 (commit._author_timezone, commit._author_timezone_neg_utc), 

2324 ) = author_info 

2325 ( 

2326 commit._committer, 

2327 commit._commit_time, 

2328 (commit._commit_timezone, commit._commit_timezone_neg_utc), 

2329 ) = commit_info 

2330 

2331 return commit 

2332 

2333 

2334# Hold on to the pure-python implementations for testing 

2335_parse_tree_py = parse_tree 

2336_sorted_tree_items_py = sorted_tree_items 

2337try: 

2338 # Try to import Rust versions 

2339 from dulwich._objects import ( 

2340 parse_tree as _parse_tree_rs, 

2341 ) 

2342 from dulwich._objects import ( 

2343 sorted_tree_items as _sorted_tree_items_rs, 

2344 ) 

2345except ImportError: 

2346 pass 

2347else: 

2348 parse_tree = _parse_tree_rs 

2349 sorted_tree_items = _sorted_tree_items_rs