Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

965 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as published by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25import binascii 

26import os 

27import posixpath 

28import stat 

29import sys 

30import zlib 

31from collections import namedtuple 

32from collections.abc import Callable, Iterable, Iterator 

33from hashlib import sha1 

34from io import BufferedIOBase, BytesIO 

35from typing import ( 

36 IO, 

37 TYPE_CHECKING, 

38 Optional, 

39 Union, 

40) 

41 

42if sys.version_info >= (3, 11): 

43 from typing import Self 

44else: 

45 from typing_extensions import Self 

46 

47if sys.version_info >= (3, 10): 

48 from typing import TypeGuard 

49else: 

50 from typing_extensions import TypeGuard 

51 

52from . import replace_me 

53from .errors import ( 

54 ChecksumMismatch, 

55 FileFormatException, 

56 NotBlobError, 

57 NotCommitError, 

58 NotTagError, 

59 NotTreeError, 

60 ObjectFormatException, 

61) 

62from .file import GitFile 

63 

64if TYPE_CHECKING: 

65 from _hashlib import HASH 

66 

67 from .file import _GitFile 

68 

69ZERO_SHA = b"0" * 40 

70 

71# Header fields for commits 

72_TREE_HEADER = b"tree" 

73_PARENT_HEADER = b"parent" 

74_AUTHOR_HEADER = b"author" 

75_COMMITTER_HEADER = b"committer" 

76_ENCODING_HEADER = b"encoding" 

77_MERGETAG_HEADER = b"mergetag" 

78_GPGSIG_HEADER = b"gpgsig" 

79 

80# Header fields for objects 

81_OBJECT_HEADER = b"object" 

82_TYPE_HEADER = b"type" 

83_TAG_HEADER = b"tag" 

84_TAGGER_HEADER = b"tagger" 

85 

86 

87S_IFGITLINK = 0o160000 

88 

89 

90MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

91 

92BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

93 

94 

95ObjectID = bytes 

96 

97 

98class EmptyFileException(FileFormatException): 

99 """An unexpectedly empty file was encountered.""" 

100 

101 

102def S_ISGITLINK(m: int) -> bool: 

103 """Check if a mode indicates a submodule. 

104 

105 Args: 

106 m: Mode to check 

107 Returns: a ``boolean`` 

108 """ 

109 return stat.S_IFMT(m) == S_IFGITLINK 

110 

111 

112def _decompress(string: bytes) -> bytes: 

113 dcomp = zlib.decompressobj() 

114 dcomped = dcomp.decompress(string) 

115 dcomped += dcomp.flush() 

116 return dcomped 

117 

118 

119def sha_to_hex(sha: ObjectID) -> bytes: 

120 """Takes a string and returns the hex of the sha within.""" 

121 hexsha = binascii.hexlify(sha) 

122 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

123 return hexsha 

124 

125 

126def hex_to_sha(hex: Union[bytes, str]) -> bytes: 

127 """Takes a hex sha and returns a binary sha.""" 

128 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}" 

129 try: 

130 return binascii.unhexlify(hex) 

131 except TypeError as exc: 

132 if not isinstance(hex, bytes): 

133 raise 

134 raise ValueError(exc.args[0]) from exc 

135 

136 

137def valid_hexsha(hex: Union[bytes, str]) -> bool: 

138 if len(hex) != 40: 

139 return False 

140 try: 

141 binascii.unhexlify(hex) 

142 except (TypeError, binascii.Error): 

143 return False 

144 else: 

145 return True 

146 

147 

148def hex_to_filename( 

149 path: Union[str, bytes], hex: Union[str, bytes] 

150) -> Union[str, bytes]: 

151 """Takes a hex sha and returns its filename relative to the given path.""" 

152 # os.path.join accepts bytes or unicode, but all args must be of the same 

153 # type. Make sure that hex which is expected to be bytes, is the same type 

154 # as path. 

155 if type(path) is not type(hex) and isinstance(path, str): 

156 hex = hex.decode("ascii") # type: ignore 

157 dir_name = hex[:2] 

158 file_name = hex[2:] 

159 # Check from object dir 

160 return os.path.join(path, dir_name, file_name) # type: ignore 

161 

162 

163def filename_to_hex(filename: Union[str, bytes]) -> str: 

164 """Takes an object filename and returns its corresponding hex sha.""" 

165 # grab the last (up to) two path components 

166 names = filename.rsplit(os.path.sep, 2)[-2:] # type: ignore 

167 errmsg = f"Invalid object filename: {filename!r}" 

168 assert len(names) == 2, errmsg 

169 base, rest = names 

170 assert len(base) == 2 and len(rest) == 38, errmsg 

171 hex_bytes = (base + rest).encode("ascii") # type: ignore 

172 hex_to_sha(hex_bytes) 

173 return hex_bytes.decode("ascii") 

174 

175 

176def object_header(num_type: int, length: int) -> bytes: 

177 """Return an object header for the given numeric type and text length.""" 

178 cls = object_class(num_type) 

179 if cls is None: 

180 raise AssertionError(f"unsupported class type num: {num_type}") 

181 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

182 

183 

184def serializable_property(name: str, docstring: Optional[str] = None) -> property: 

185 """A property that helps tracking whether serialization is necessary.""" 

186 

187 def set(obj: "ShaFile", value: object) -> None: 

188 setattr(obj, "_" + name, value) 

189 obj._needs_serialization = True 

190 

191 def get(obj: "ShaFile") -> object: 

192 return getattr(obj, "_" + name) 

193 

194 return property(get, set, doc=docstring) 

195 

196 

197def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]: 

198 """Get the object class corresponding to the given type. 

199 

200 Args: 

201 type: Either a type name string or a numeric type. 

202 Returns: The ShaFile subclass corresponding to the given type, or None if 

203 type is not a valid type name/number. 

204 """ 

205 return _TYPE_MAP.get(type, None) 

206 

207 

208def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None: 

209 """Check if a string is a valid hex sha string. 

210 

211 Args: 

212 hex: Hex string to check 

213 error_msg: Error message to use in exception 

214 Raises: 

215 ObjectFormatException: Raised when the string is not valid 

216 """ 

217 if not valid_hexsha(hex): 

218 raise ObjectFormatException(f"{error_msg} {hex!r}") 

219 

220 

221def check_identity(identity: Optional[bytes], error_msg: str) -> None: 

222 """Check if the specified identity is valid. 

223 

224 This will raise an exception if the identity is not valid. 

225 

226 Args: 

227 identity: Identity string 

228 error_msg: Error message to use in exception 

229 """ 

230 if identity is None: 

231 raise ObjectFormatException(error_msg) 

232 email_start = identity.find(b"<") 

233 email_end = identity.find(b">") 

234 if not all( 

235 [ 

236 email_start >= 1, 

237 identity[email_start - 1] == b" "[0], 

238 identity.find(b"<", email_start + 1) == -1, 

239 email_end == len(identity) - 1, 

240 b"\0" not in identity, 

241 b"\n" not in identity, 

242 ] 

243 ): 

244 raise ObjectFormatException(error_msg) 

245 

246 

247def _path_to_bytes(path: Union[str, bytes]) -> bytes: 

248 """Convert a path to bytes for use in error messages.""" 

249 if isinstance(path, str): 

250 return path.encode("utf-8", "surrogateescape") 

251 return path 

252 

253 

254def check_time(time_seconds: int) -> None: 

255 """Check if the specified time is not prone to overflow error. 

256 

257 This will raise an exception if the time is not valid. 

258 

259 Args: 

260 time_seconds: time in seconds 

261 

262 """ 

263 # Prevent overflow error 

264 if time_seconds > MAX_TIME: 

265 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

266 

267 

268def git_line(*items: bytes) -> bytes: 

269 """Formats items into a space separated line.""" 

270 return b" ".join(items) + b"\n" 

271 

272 

273class FixedSha: 

274 """SHA object that behaves like hashlib's but is given a fixed value.""" 

275 

276 __slots__ = ("_hexsha", "_sha") 

277 

278 def __init__(self, hexsha: Union[str, bytes]) -> None: 

279 if isinstance(hexsha, str): 

280 hexsha = hexsha.encode("ascii") 

281 if not isinstance(hexsha, bytes): 

282 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

283 self._hexsha = hexsha 

284 self._sha = hex_to_sha(hexsha) 

285 

286 def digest(self) -> bytes: 

287 """Return the raw SHA digest.""" 

288 return self._sha 

289 

290 def hexdigest(self) -> str: 

291 """Return the hex SHA digest.""" 

292 return self._hexsha.decode("ascii") 

293 

294 

295# Type guard functions for runtime type narrowing 

296if TYPE_CHECKING: 

297 

298 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

299 """Check if a ShaFile is a Commit.""" 

300 return obj.type_name == b"commit" 

301 

302 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

303 """Check if a ShaFile is a Tree.""" 

304 return obj.type_name == b"tree" 

305 

306 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

307 """Check if a ShaFile is a Blob.""" 

308 return obj.type_name == b"blob" 

309 

310 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

311 """Check if a ShaFile is a Tag.""" 

312 return obj.type_name == b"tag" 

313else: 

314 # Runtime versions without type narrowing 

315 def is_commit(obj: "ShaFile") -> bool: 

316 """Check if a ShaFile is a Commit.""" 

317 return obj.type_name == b"commit" 

318 

319 def is_tree(obj: "ShaFile") -> bool: 

320 """Check if a ShaFile is a Tree.""" 

321 return obj.type_name == b"tree" 

322 

323 def is_blob(obj: "ShaFile") -> bool: 

324 """Check if a ShaFile is a Blob.""" 

325 return obj.type_name == b"blob" 

326 

327 def is_tag(obj: "ShaFile") -> bool: 

328 """Check if a ShaFile is a Tag.""" 

329 return obj.type_name == b"tag" 

330 

331 

332class ShaFile: 

333 """A git SHA file.""" 

334 

335 __slots__ = ("_chunked_text", "_needs_serialization", "_sha") 

336 

337 _needs_serialization: bool 

338 type_name: bytes 

339 type_num: int 

340 _chunked_text: Optional[list[bytes]] 

341 _sha: Union[FixedSha, None, "HASH"] 

342 

343 @staticmethod 

344 def _parse_legacy_object_header( 

345 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

346 ) -> "ShaFile": 

347 """Parse a legacy object, creating it but not reading the file.""" 

348 bufsize = 1024 

349 decomp = zlib.decompressobj() 

350 header = decomp.decompress(magic) 

351 start = 0 

352 end = -1 

353 while end < 0: 

354 extra = f.read(bufsize) 

355 header += decomp.decompress(extra) 

356 magic += extra 

357 end = header.find(b"\0", start) 

358 start = len(header) 

359 header = header[:end] 

360 type_name, size = header.split(b" ", 1) 

361 try: 

362 int(size) # sanity check 

363 except ValueError as exc: 

364 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

365 obj_class = object_class(type_name) 

366 if not obj_class: 

367 raise ObjectFormatException( 

368 "Not a known type: {}".format(type_name.decode("ascii")) 

369 ) 

370 return obj_class() 

371 

372 def _parse_legacy_object(self, map: bytes) -> None: 

373 """Parse a legacy object, setting the raw string.""" 

374 text = _decompress(map) 

375 header_end = text.find(b"\0") 

376 if header_end < 0: 

377 raise ObjectFormatException("Invalid object header, no \\0") 

378 self.set_raw_string(text[header_end + 1 :]) 

379 

380 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

381 """Return chunks representing the object in the experimental format. 

382 

383 Returns: List of strings 

384 """ 

385 compobj = zlib.compressobj(compression_level) 

386 yield compobj.compress(self._header()) 

387 for chunk in self.as_raw_chunks(): 

388 yield compobj.compress(chunk) 

389 yield compobj.flush() 

390 

391 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

392 """Return string representing the object in the experimental format.""" 

393 return b"".join( 

394 self.as_legacy_object_chunks(compression_level=compression_level) 

395 ) 

396 

397 def as_raw_chunks(self) -> list[bytes]: 

398 """Return chunks with serialization of the object. 

399 

400 Returns: List of strings, not necessarily one per line 

401 """ 

402 if self._needs_serialization: 

403 self._sha = None 

404 self._chunked_text = self._serialize() 

405 self._needs_serialization = False 

406 return self._chunked_text # type: ignore 

407 

408 def as_raw_string(self) -> bytes: 

409 """Return raw string with serialization of the object. 

410 

411 Returns: String object 

412 """ 

413 return b"".join(self.as_raw_chunks()) 

414 

415 def __bytes__(self) -> bytes: 

416 """Return raw string serialization of this object.""" 

417 return self.as_raw_string() 

418 

419 def __hash__(self) -> int: 

420 """Return unique hash for this object.""" 

421 return hash(self.id) 

422 

423 def as_pretty_string(self) -> str: 

424 """Return a string representing this object, fit for display.""" 

425 return self.as_raw_string().decode("utf-8", "replace") 

426 

427 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None: 

428 """Set the contents of this object from a serialized string.""" 

429 if not isinstance(text, bytes): 

430 raise TypeError(f"Expected bytes for text, got {text!r}") 

431 self.set_raw_chunks([text], sha) 

432 

433 def set_raw_chunks( 

434 self, chunks: list[bytes], sha: Optional[ObjectID] = None 

435 ) -> None: 

436 """Set the contents of this object from a list of chunks.""" 

437 self._chunked_text = chunks 

438 self._deserialize(chunks) 

439 if sha is None: 

440 self._sha = None 

441 else: 

442 self._sha = FixedSha(sha) 

443 self._needs_serialization = False 

444 

445 @staticmethod 

446 def _parse_object_header( 

447 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

448 ) -> "ShaFile": 

449 """Parse a new style object, creating it but not reading the file.""" 

450 num_type = (ord(magic[0:1]) >> 4) & 7 

451 obj_class = object_class(num_type) 

452 if not obj_class: 

453 raise ObjectFormatException(f"Not a known type {num_type}") 

454 return obj_class() 

455 

456 def _parse_object(self, map: bytes) -> None: 

457 """Parse a new style object, setting self._text.""" 

458 # skip type and size; type must have already been determined, and 

459 # we trust zlib to fail if it's otherwise corrupted 

460 byte = ord(map[0:1]) 

461 used = 1 

462 while (byte & 0x80) != 0: 

463 byte = ord(map[used : used + 1]) 

464 used += 1 

465 raw = map[used:] 

466 self.set_raw_string(_decompress(raw)) 

467 

468 @classmethod 

469 def _is_legacy_object(cls, magic: bytes) -> bool: 

470 b0 = ord(magic[0:1]) 

471 b1 = ord(magic[1:2]) 

472 word = (b0 << 8) + b1 

473 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

474 

475 @classmethod 

476 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

477 map = f.read() 

478 if not map: 

479 raise EmptyFileException("Corrupted empty file detected") 

480 

481 if cls._is_legacy_object(map): 

482 obj = cls._parse_legacy_object_header(map, f) 

483 obj._parse_legacy_object(map) 

484 else: 

485 obj = cls._parse_object_header(map, f) 

486 obj._parse_object(map) 

487 return obj 

488 

489 def __init__(self) -> None: 

490 """Don't call this directly.""" 

491 self._sha = None 

492 self._chunked_text = [] 

493 self._needs_serialization = True 

494 

495 def _deserialize(self, chunks: list[bytes]) -> None: 

496 raise NotImplementedError(self._deserialize) 

497 

498 def _serialize(self) -> list[bytes]: 

499 raise NotImplementedError(self._serialize) 

500 

501 @classmethod 

502 def from_path(cls, path: Union[str, bytes]) -> "ShaFile": 

503 """Open a SHA file from disk.""" 

504 with GitFile(path, "rb") as f: 

505 return cls.from_file(f) 

506 

507 @classmethod 

508 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

509 """Get the contents of a SHA file on disk.""" 

510 try: 

511 obj = cls._parse_file(f) 

512 obj._sha = None 

513 return obj 

514 except (IndexError, ValueError) as exc: 

515 raise ObjectFormatException("invalid object header") from exc 

516 

517 @staticmethod 

518 def from_raw_string( 

519 type_num: int, string: bytes, sha: Optional[ObjectID] = None 

520 ) -> "ShaFile": 

521 """Creates an object of the indicated type from the raw string given. 

522 

523 Args: 

524 type_num: The numeric type of the object. 

525 string: The raw uncompressed contents. 

526 sha: Optional known sha for the object 

527 """ 

528 cls = object_class(type_num) 

529 if cls is None: 

530 raise AssertionError(f"unsupported class type num: {type_num}") 

531 obj = cls() 

532 obj.set_raw_string(string, sha) 

533 return obj 

534 

535 @staticmethod 

536 def from_raw_chunks( 

537 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None 

538 ) -> "ShaFile": 

539 """Creates an object of the indicated type from the raw chunks given. 

540 

541 Args: 

542 type_num: The numeric type of the object. 

543 chunks: An iterable of the raw uncompressed contents. 

544 sha: Optional known sha for the object 

545 """ 

546 cls = object_class(type_num) 

547 if cls is None: 

548 raise AssertionError(f"unsupported class type num: {type_num}") 

549 obj = cls() 

550 obj.set_raw_chunks(chunks, sha) 

551 return obj 

552 

553 @classmethod 

554 def from_string(cls, string: bytes) -> Self: 

555 """Create a ShaFile from a string.""" 

556 obj = cls() 

557 obj.set_raw_string(string) 

558 return obj 

559 

560 def _check_has_member(self, member: str, error_msg: str) -> None: 

561 """Check that the object has a given member variable. 

562 

563 Args: 

564 member: the member variable to check for 

565 error_msg: the message for an error if the member is missing 

566 Raises: 

567 ObjectFormatException: with the given error_msg if member is 

568 missing or is None 

569 """ 

570 if getattr(self, member, None) is None: 

571 raise ObjectFormatException(error_msg) 

572 

573 def check(self) -> None: 

574 """Check this object for internal consistency. 

575 

576 Raises: 

577 ObjectFormatException: if the object is malformed in some way 

578 ChecksumMismatch: if the object was created with a SHA that does 

579 not match its contents 

580 """ 

581 # TODO: if we find that error-checking during object parsing is a 

582 # performance bottleneck, those checks should be moved to the class's 

583 # check() method during optimization so we can still check the object 

584 # when necessary. 

585 old_sha = self.id 

586 try: 

587 self._deserialize(self.as_raw_chunks()) 

588 self._sha = None 

589 new_sha = self.id 

590 except Exception as exc: 

591 raise ObjectFormatException(exc) from exc 

592 if old_sha != new_sha: 

593 raise ChecksumMismatch(new_sha, old_sha) 

594 

595 def _header(self) -> bytes: 

596 return object_header(self.type_num, self.raw_length()) 

597 

598 def raw_length(self) -> int: 

599 """Returns the length of the raw string of this object.""" 

600 return sum(map(len, self.as_raw_chunks())) 

601 

602 def sha(self) -> Union[FixedSha, "HASH"]: 

603 """The SHA1 object that is the name of this object.""" 

604 if self._sha is None or self._needs_serialization: 

605 # this is a local because as_raw_chunks() overwrites self._sha 

606 new_sha = sha1() 

607 new_sha.update(self._header()) 

608 for chunk in self.as_raw_chunks(): 

609 new_sha.update(chunk) 

610 self._sha = new_sha 

611 return self._sha 

612 

613 def copy(self) -> "ShaFile": 

614 """Create a new copy of this SHA1 object from its raw string.""" 

615 obj_class = object_class(self.type_num) 

616 if obj_class is None: 

617 raise AssertionError(f"invalid type num {self.type_num}") 

618 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

619 

620 @property 

621 def id(self) -> bytes: 

622 """The hex SHA of this object.""" 

623 return self.sha().hexdigest().encode("ascii") 

624 

625 def __repr__(self) -> str: 

626 return f"<{self.__class__.__name__} {self.id!r}>" 

627 

628 def __ne__(self, other: object) -> bool: 

629 """Check whether this object does not match the other.""" 

630 return not isinstance(other, ShaFile) or self.id != other.id 

631 

632 def __eq__(self, other: object) -> bool: 

633 """Return True if the SHAs of the two objects match.""" 

634 return isinstance(other, ShaFile) and self.id == other.id 

635 

636 def __lt__(self, other: object) -> bool: 

637 """Return whether SHA of this object is less than the other.""" 

638 if not isinstance(other, ShaFile): 

639 raise TypeError 

640 return self.id < other.id 

641 

642 def __le__(self, other: object) -> bool: 

643 """Check whether SHA of this object is less than or equal to the other.""" 

644 if not isinstance(other, ShaFile): 

645 raise TypeError 

646 return self.id <= other.id 

647 

648 

649class Blob(ShaFile): 

650 """A Git Blob object.""" 

651 

652 __slots__ = () 

653 

654 type_name = b"blob" 

655 type_num = 3 

656 

657 _chunked_text: list[bytes] 

658 

659 def __init__(self) -> None: 

660 super().__init__() 

661 self._chunked_text = [] 

662 self._needs_serialization = False 

663 

664 def _get_data(self) -> bytes: 

665 return self.as_raw_string() 

666 

667 def _set_data(self, data: bytes) -> None: 

668 self.set_raw_string(data) 

669 

670 data = property( 

671 _get_data, _set_data, doc="The text contained within the blob object." 

672 ) 

673 

674 def _get_chunked(self) -> list[bytes]: 

675 return self._chunked_text 

676 

677 def _set_chunked(self, chunks: list[bytes]) -> None: 

678 self._chunked_text = chunks 

679 

680 def _serialize(self) -> list[bytes]: 

681 return self._chunked_text 

682 

683 def _deserialize(self, chunks: list[bytes]) -> None: 

684 self._chunked_text = chunks 

685 

686 chunked = property( 

687 _get_chunked, 

688 _set_chunked, 

689 doc="The text in the blob object, as chunks (not necessarily lines)", 

690 ) 

691 

692 @classmethod 

693 def from_path(cls, path: Union[str, bytes]) -> "Blob": 

694 blob = ShaFile.from_path(path) 

695 if not isinstance(blob, cls): 

696 raise NotBlobError(_path_to_bytes(path)) 

697 return blob 

698 

699 def check(self) -> None: 

700 """Check this object for internal consistency. 

701 

702 Raises: 

703 ObjectFormatException: if the object is malformed in some way 

704 """ 

705 super().check() 

706 

707 def splitlines(self) -> list[bytes]: 

708 """Return list of lines in this blob. 

709 

710 This preserves the original line endings. 

711 """ 

712 chunks = self.chunked 

713 if not chunks: 

714 return [] 

715 if len(chunks) == 1: 

716 return chunks[0].splitlines(True) # type: ignore[no-any-return] 

717 remaining = None 

718 ret = [] 

719 for chunk in chunks: 

720 lines = chunk.splitlines(True) 

721 if len(lines) > 1: 

722 ret.append((remaining or b"") + lines[0]) 

723 ret.extend(lines[1:-1]) 

724 remaining = lines[-1] 

725 elif len(lines) == 1: 

726 if remaining is None: 

727 remaining = lines.pop() 

728 else: 

729 remaining += lines.pop() 

730 if remaining is not None: 

731 ret.append(remaining) 

732 return ret 

733 

734 

735def _parse_message( 

736 chunks: Iterable[bytes], 

737) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]: 

738 """Parse a message with a list of fields and a body. 

739 

740 Args: 

741 chunks: the raw chunks of the tag or commit object. 

742 Returns: iterator of tuples of (field, value), one per header line, in the 

743 order read from the text, possibly including duplicates. Includes a 

744 field named None for the freeform tag/commit text. 

745 """ 

746 f = BytesIO(b"".join(chunks)) 

747 k = None 

748 v = b"" 

749 eof = False 

750 

751 def _strip_last_newline(value: bytes) -> bytes: 

752 """Strip the last newline from value.""" 

753 if value and value.endswith(b"\n"): 

754 return value[:-1] 

755 return value 

756 

757 # Parse the headers 

758 # 

759 # Headers can contain newlines. The next line is indented with a space. 

760 # We store the latest key as 'k', and the accumulated value as 'v'. 

761 for line in f: 

762 if line.startswith(b" "): 

763 # Indented continuation of the previous line 

764 v += line[1:] 

765 else: 

766 if k is not None: 

767 # We parsed a new header, return its value 

768 yield (k, _strip_last_newline(v)) 

769 if line == b"\n": 

770 # Empty line indicates end of headers 

771 break 

772 (k, v) = line.split(b" ", 1) 

773 

774 else: 

775 # We reached end of file before the headers ended. We still need to 

776 # return the previous header, then we need to return a None field for 

777 # the text. 

778 eof = True 

779 if k is not None: 

780 yield (k, _strip_last_newline(v)) 

781 yield (None, None) 

782 

783 if not eof: 

784 # We didn't reach the end of file while parsing headers. We can return 

785 # the rest of the file as a message. 

786 yield (None, f.read()) 

787 

788 f.close() 

789 

790 

791def _format_message( 

792 headers: list[tuple[bytes, bytes]], body: Optional[bytes] 

793) -> Iterator[bytes]: 

794 for field, value in headers: 

795 lines = value.split(b"\n") 

796 yield git_line(field, lines[0]) 

797 for line in lines[1:]: 

798 yield b" " + line + b"\n" 

799 yield b"\n" # There must be a new line after the headers 

800 if body: 

801 yield body 

802 

803 

804class Tag(ShaFile): 

805 """A Git Tag object.""" 

806 

807 type_name = b"tag" 

808 type_num = 4 

809 

810 __slots__ = ( 

811 "_message", 

812 "_name", 

813 "_object_class", 

814 "_object_sha", 

815 "_signature", 

816 "_tag_time", 

817 "_tag_timezone", 

818 "_tag_timezone_neg_utc", 

819 "_tagger", 

820 ) 

821 

822 _message: Optional[bytes] 

823 _name: Optional[bytes] 

824 _object_class: Optional[type["ShaFile"]] 

825 _object_sha: Optional[bytes] 

826 _signature: Optional[bytes] 

827 _tag_time: Optional[int] 

828 _tag_timezone: Optional[int] 

829 _tag_timezone_neg_utc: Optional[bool] 

830 _tagger: Optional[bytes] 

831 

832 def __init__(self) -> None: 

833 super().__init__() 

834 self._tagger = None 

835 self._tag_time = None 

836 self._tag_timezone = None 

837 self._tag_timezone_neg_utc = False 

838 self._signature: Optional[bytes] = None 

839 

840 @classmethod 

841 def from_path(cls, filename: Union[str, bytes]) -> "Tag": 

842 tag = ShaFile.from_path(filename) 

843 if not isinstance(tag, cls): 

844 raise NotTagError(_path_to_bytes(filename)) 

845 return tag 

846 

847 def check(self) -> None: 

848 """Check this object for internal consistency. 

849 

850 Raises: 

851 ObjectFormatException: if the object is malformed in some way 

852 """ 

853 super().check() 

854 assert self._chunked_text is not None 

855 self._check_has_member("_object_sha", "missing object sha") 

856 self._check_has_member("_object_class", "missing object type") 

857 self._check_has_member("_name", "missing tag name") 

858 

859 if not self._name: 

860 raise ObjectFormatException("empty tag name") 

861 

862 if self._object_sha is None: 

863 raise ObjectFormatException("missing object sha") 

864 check_hexsha(self._object_sha, "invalid object sha") 

865 

866 if self._tagger is not None: 

867 check_identity(self._tagger, "invalid tagger") 

868 

869 self._check_has_member("_tag_time", "missing tag time") 

870 if self._tag_time is None: 

871 raise ObjectFormatException("missing tag time") 

872 check_time(self._tag_time) 

873 

874 last = None 

875 for field, _ in _parse_message(self._chunked_text): 

876 if field == _OBJECT_HEADER and last is not None: 

877 raise ObjectFormatException("unexpected object") 

878 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

879 raise ObjectFormatException("unexpected type") 

880 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

881 raise ObjectFormatException("unexpected tag name") 

882 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

883 raise ObjectFormatException("unexpected tagger") 

884 last = field 

885 

886 def _serialize(self) -> list[bytes]: 

887 headers = [] 

888 if self._object_sha is None: 

889 raise ObjectFormatException("missing object sha") 

890 headers.append((_OBJECT_HEADER, self._object_sha)) 

891 if self._object_class is None: 

892 raise ObjectFormatException("missing object class") 

893 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

894 if self._name is None: 

895 raise ObjectFormatException("missing tag name") 

896 headers.append((_TAG_HEADER, self._name)) 

897 if self._tagger: 

898 if self._tag_time is None: 

899 headers.append((_TAGGER_HEADER, self._tagger)) 

900 else: 

901 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

902 raise ObjectFormatException("missing timezone info") 

903 headers.append( 

904 ( 

905 _TAGGER_HEADER, 

906 format_time_entry( 

907 self._tagger, 

908 self._tag_time, 

909 (self._tag_timezone, self._tag_timezone_neg_utc), 

910 ), 

911 ) 

912 ) 

913 

914 if self.message is None and self._signature is None: 

915 body = None 

916 else: 

917 body = (self.message or b"") + (self._signature or b"") 

918 return list(_format_message(headers, body)) 

919 

920 def _deserialize(self, chunks: list[bytes]) -> None: 

921 """Grab the metadata attached to the tag.""" 

922 self._tagger = None 

923 self._tag_time = None 

924 self._tag_timezone = None 

925 self._tag_timezone_neg_utc = False 

926 for field, value in _parse_message(chunks): 

927 if field == _OBJECT_HEADER: 

928 self._object_sha = value 

929 elif field == _TYPE_HEADER: 

930 assert isinstance(value, bytes) 

931 obj_class = object_class(value) 

932 if not obj_class: 

933 raise ObjectFormatException(f"Not a known type: {value!r}") 

934 self._object_class = obj_class 

935 elif field == _TAG_HEADER: 

936 self._name = value 

937 elif field == _TAGGER_HEADER: 

938 if value is None: 

939 raise ObjectFormatException("missing tagger value") 

940 ( 

941 self._tagger, 

942 self._tag_time, 

943 (self._tag_timezone, self._tag_timezone_neg_utc), 

944 ) = parse_time_entry(value) 

945 elif field is None: 

946 if value is None: 

947 self._message = None 

948 self._signature = None 

949 else: 

950 try: 

951 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

952 except ValueError: 

953 self._message = value 

954 self._signature = None 

955 else: 

956 self._message = value[:sig_idx] 

957 self._signature = value[sig_idx:] 

958 else: 

959 raise ObjectFormatException( 

960 f"Unknown field {field.decode('ascii', 'replace')}" 

961 ) 

962 

963 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

964 """Get the object pointed to by this tag. 

965 

966 Returns: tuple of (object class, sha). 

967 """ 

968 if self._object_class is None or self._object_sha is None: 

969 raise ValueError("Tag object is not properly initialized") 

970 return (self._object_class, self._object_sha) 

971 

972 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

973 (self._object_class, self._object_sha) = value 

974 self._needs_serialization = True 

975 

976 object = property(_get_object, _set_object) 

977 

978 name = serializable_property("name", "The name of this tag") 

979 tagger = serializable_property( 

980 "tagger", "Returns the name of the person who created this tag" 

981 ) 

982 tag_time = serializable_property( 

983 "tag_time", 

984 "The creation timestamp of the tag. As the number of seconds since the epoch", 

985 ) 

986 tag_timezone = serializable_property( 

987 "tag_timezone", "The timezone that tag_time is in." 

988 ) 

989 message = serializable_property("message", "the message attached to this tag") 

990 

991 signature = serializable_property("signature", "Optional detached GPG signature") 

992 

993 def sign(self, keyid: Optional[str] = None) -> None: 

994 import gpg 

995 

996 with gpg.Context(armor=True) as c: 

997 if keyid is not None: 

998 key = c.get_key(keyid) 

999 with gpg.Context(armor=True, signers=[key]) as ctx: 

1000 self.signature, unused_result = ctx.sign( 

1001 self.as_raw_string(), 

1002 mode=gpg.constants.sig.mode.DETACH, 

1003 ) 

1004 else: 

1005 self.signature, unused_result = c.sign( 

1006 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1007 ) 

1008 

1009 def raw_without_sig(self) -> bytes: 

1010 """Return raw string serialization without the GPG/SSH signature. 

1011 

1012 self.signature is a signature for the returned raw byte string serialization. 

1013 """ 

1014 ret = self.as_raw_string() 

1015 if self._signature: 

1016 ret = ret[: -len(self._signature)] 

1017 return ret 

1018 

1019 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1020 """Verify GPG signature for this tag (if it is signed). 

1021 

1022 Args: 

1023 keyids: Optional iterable of trusted keyids for this tag. 

1024 If this tag is not signed by any key in keyids verification will 

1025 fail. If not specified, this function only verifies that the tag 

1026 has a valid signature. 

1027 

1028 Raises: 

1029 gpg.errors.BadSignatures: if GPG signature verification fails 

1030 gpg.errors.MissingSignatures: if tag was not signed by a key 

1031 specified in keyids 

1032 """ 

1033 if self._signature is None: 

1034 return 

1035 

1036 import gpg 

1037 

1038 with gpg.Context() as ctx: 

1039 data, result = ctx.verify( 

1040 self.raw_without_sig(), 

1041 signature=self._signature, 

1042 ) 

1043 if keyids: 

1044 keys = [ctx.get_key(key) for key in keyids] 

1045 for key in keys: 

1046 for subkey in keys: 

1047 for sig in result.signatures: 

1048 if subkey.can_sign and subkey.fpr == sig.fpr: 

1049 return 

1050 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1051 

1052 

1053class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])): 

1054 """Named tuple encapsulating a single tree entry.""" 

1055 

1056 def in_path(self, path: bytes) -> "TreeEntry": 

1057 """Return a copy of this entry with the given path prepended.""" 

1058 if not isinstance(self.path, bytes): 

1059 raise TypeError(f"Expected bytes for path, got {path!r}") 

1060 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1061 

1062 

1063def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]: 

1064 """Parse a tree text. 

1065 

1066 Args: 

1067 text: Serialized text to parse 

1068 Returns: iterator of tuples of (name, mode, sha) 

1069 

1070 Raises: 

1071 ObjectFormatException: if the object was malformed in some way 

1072 """ 

1073 count = 0 

1074 length = len(text) 

1075 while count < length: 

1076 mode_end = text.index(b" ", count) 

1077 mode_text = text[count:mode_end] 

1078 if strict and mode_text.startswith(b"0"): 

1079 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1080 try: 

1081 mode = int(mode_text, 8) 

1082 except ValueError as exc: 

1083 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1084 name_end = text.index(b"\0", mode_end) 

1085 name = text[mode_end + 1 : name_end] 

1086 count = name_end + 21 

1087 sha = text[name_end + 1 : count] 

1088 if len(sha) != 20: 

1089 raise ObjectFormatException("Sha has invalid length") 

1090 hexsha = sha_to_hex(sha) 

1091 yield (name, mode, hexsha) 

1092 

1093 

1094def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]: 

1095 """Serialize the items in a tree to a text. 

1096 

1097 Args: 

1098 items: Sorted iterable over (name, mode, sha) tuples 

1099 Returns: Serialized tree text as chunks 

1100 """ 

1101 for name, mode, hexsha in items: 

1102 yield ( 

1103 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1104 ) 

1105 

1106 

1107def sorted_tree_items( 

1108 entries: dict[bytes, tuple[int, bytes]], name_order: bool 

1109) -> Iterator[TreeEntry]: 

1110 """Iterate over a tree entries dictionary. 

1111 

1112 Args: 

1113 name_order: If True, iterate entries in order of their name. If 

1114 False, iterate entries in tree order, that is, treat subtree entries as 

1115 having '/' appended. 

1116 entries: Dictionary mapping names to (mode, sha) tuples 

1117 Returns: Iterator over (name, mode, hexsha) 

1118 """ 

1119 if name_order: 

1120 key_func = key_entry_name_order 

1121 else: 

1122 key_func = key_entry 

1123 for name, entry in sorted(entries.items(), key=key_func): 

1124 mode, hexsha = entry 

1125 # Stricter type checks than normal to mirror checks in the Rust version. 

1126 mode = int(mode) 

1127 if not isinstance(hexsha, bytes): 

1128 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1129 yield TreeEntry(name, mode, hexsha) 

1130 

1131 

1132def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1133 """Sort key for tree entry. 

1134 

1135 Args: 

1136 entry: (name, value) tuple 

1137 """ 

1138 (name, (mode, _sha)) = entry 

1139 if stat.S_ISDIR(mode): 

1140 name += b"/" 

1141 return name 

1142 

1143 

1144def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1145 """Sort key for tree entry in name order.""" 

1146 return entry[0] 

1147 

1148 

1149def pretty_format_tree_entry( 

1150 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8" 

1151) -> str: 

1152 """Pretty format tree entry. 

1153 

1154 Args: 

1155 name: Name of the directory entry 

1156 mode: Mode of entry 

1157 hexsha: Hexsha of the referenced object 

1158 Returns: string describing the tree entry 

1159 """ 

1160 if mode & stat.S_IFDIR: 

1161 kind = "tree" 

1162 else: 

1163 kind = "blob" 

1164 return "{:04o} {} {}\t{}\n".format( 

1165 mode, 

1166 kind, 

1167 hexsha.decode("ascii"), 

1168 name.decode(encoding, "replace"), 

1169 ) 

1170 

1171 

1172class SubmoduleEncountered(Exception): 

1173 """A submodule was encountered while resolving a path.""" 

1174 

1175 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1176 self.path = path 

1177 self.sha = sha 

1178 

1179 

1180class Tree(ShaFile): 

1181 """A Git tree object.""" 

1182 

1183 type_name = b"tree" 

1184 type_num = 2 

1185 

1186 __slots__ = "_entries" 

1187 

1188 def __init__(self) -> None: 

1189 super().__init__() 

1190 self._entries: dict[bytes, tuple[int, bytes]] = {} 

1191 

1192 @classmethod 

1193 def from_path(cls, filename: Union[str, bytes]) -> "Tree": 

1194 tree = ShaFile.from_path(filename) 

1195 if not isinstance(tree, cls): 

1196 raise NotTreeError(_path_to_bytes(filename)) 

1197 return tree 

1198 

1199 def __contains__(self, name: bytes) -> bool: 

1200 return name in self._entries 

1201 

1202 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1203 return self._entries[name] 

1204 

1205 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1206 """Set a tree entry by name. 

1207 

1208 Args: 

1209 name: The name of the entry, as a string. 

1210 value: A tuple of (mode, hexsha), where mode is the mode of the 

1211 entry as an integral type and hexsha is the hex SHA of the entry as 

1212 a string. 

1213 """ 

1214 mode, hexsha = value 

1215 self._entries[name] = (mode, hexsha) 

1216 self._needs_serialization = True 

1217 

1218 def __delitem__(self, name: bytes) -> None: 

1219 del self._entries[name] 

1220 self._needs_serialization = True 

1221 

1222 def __len__(self) -> int: 

1223 return len(self._entries) 

1224 

1225 def __iter__(self) -> Iterator[bytes]: 

1226 return iter(self._entries) 

1227 

1228 def add(self, name: bytes, mode: int, hexsha: bytes) -> None: 

1229 """Add an entry to the tree. 

1230 

1231 Args: 

1232 mode: The mode of the entry as an integral type. Not all 

1233 possible modes are supported by git; see check() for details. 

1234 name: The name of the entry, as a string. 

1235 hexsha: The hex SHA of the entry as a string. 

1236 """ 

1237 self._entries[name] = mode, hexsha 

1238 self._needs_serialization = True 

1239 

1240 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1241 """Iterate over entries. 

1242 

1243 Args: 

1244 name_order: If True, iterate in name order instead of tree 

1245 order. 

1246 Returns: Iterator over (name, mode, sha) tuples 

1247 """ 

1248 return sorted_tree_items(self._entries, name_order) 

1249 

1250 def items(self) -> list[TreeEntry]: 

1251 """Return the sorted entries in this tree. 

1252 

1253 Returns: List with (name, mode, sha) tuples 

1254 """ 

1255 return list(self.iteritems()) 

1256 

1257 def _deserialize(self, chunks: list[bytes]) -> None: 

1258 """Grab the entries in the tree.""" 

1259 try: 

1260 parsed_entries = parse_tree(b"".join(chunks)) 

1261 except ValueError as exc: 

1262 raise ObjectFormatException(exc) from exc 

1263 # TODO: list comprehension is for efficiency in the common (small) 

1264 # case; if memory efficiency in the large case is a concern, use a 

1265 # genexp. 

1266 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1267 

1268 def check(self) -> None: 

1269 """Check this object for internal consistency. 

1270 

1271 Raises: 

1272 ObjectFormatException: if the object is malformed in some way 

1273 """ 

1274 super().check() 

1275 assert self._chunked_text is not None 

1276 last = None 

1277 allowed_modes = ( 

1278 stat.S_IFREG | 0o755, 

1279 stat.S_IFREG | 0o644, 

1280 stat.S_IFLNK, 

1281 stat.S_IFDIR, 

1282 S_IFGITLINK, 

1283 # TODO: optionally exclude as in git fsck --strict 

1284 stat.S_IFREG | 0o664, 

1285 ) 

1286 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1287 check_hexsha(sha, f"invalid sha {sha!r}") 

1288 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1289 raise ObjectFormatException( 

1290 "invalid name {}".format(name.decode("utf-8", "replace")) 

1291 ) 

1292 

1293 if mode not in allowed_modes: 

1294 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1295 

1296 entry = (name, (mode, sha)) 

1297 if last: 

1298 if key_entry(last) > key_entry(entry): 

1299 raise ObjectFormatException("entries not sorted") 

1300 if name == last[0]: 

1301 raise ObjectFormatException(f"duplicate entry {name!r}") 

1302 last = entry 

1303 

1304 def _serialize(self) -> list[bytes]: 

1305 return list(serialize_tree(self.iteritems())) 

1306 

1307 def as_pretty_string(self) -> str: 

1308 text: list[str] = [] 

1309 for name, mode, hexsha in self.iteritems(): 

1310 text.append(pretty_format_tree_entry(name, mode, hexsha)) 

1311 return "".join(text) 

1312 

1313 def lookup_path( 

1314 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1315 ) -> tuple[int, ObjectID]: 

1316 """Look up an object in a Git tree. 

1317 

1318 Args: 

1319 lookup_obj: Callback for retrieving object by SHA1 

1320 path: Path to lookup 

1321 Returns: A tuple of (mode, SHA) of the resulting path. 

1322 """ 

1323 # Handle empty path - return the tree itself 

1324 if not path: 

1325 return stat.S_IFDIR, self.id 

1326 

1327 parts = path.split(b"/") 

1328 sha = self.id 

1329 mode: Optional[int] = None 

1330 for i, p in enumerate(parts): 

1331 if not p: 

1332 continue 

1333 if mode is not None and S_ISGITLINK(mode): 

1334 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1335 obj = lookup_obj(sha) 

1336 if not isinstance(obj, Tree): 

1337 raise NotTreeError(sha) 

1338 mode, sha = obj[p] 

1339 if mode is None: 

1340 raise ValueError("No valid path found") 

1341 return mode, sha 

1342 

1343 

1344def parse_timezone(text: bytes) -> tuple[int, bool]: 

1345 """Parse a timezone text fragment (e.g. '+0100'). 

1346 

1347 Args: 

1348 text: Text to parse. 

1349 Returns: Tuple with timezone as seconds difference to UTC 

1350 and a boolean indicating whether this was a UTC timezone 

1351 prefixed with a negative sign (-0000). 

1352 """ 

1353 # cgit parses the first character as the sign, and the rest 

1354 # as an integer (using strtol), which could also be negative. 

1355 # We do the same for compatibility. See #697828. 

1356 if text[0] not in b"+-": 

1357 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1358 sign = text[:1] 

1359 offset = int(text[1:]) 

1360 if sign == b"-": 

1361 offset = -offset 

1362 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1363 signum = ((offset < 0) and -1) or 1 

1364 offset = abs(offset) 

1365 hours = int(offset / 100) 

1366 minutes = offset % 100 

1367 return ( 

1368 signum * (hours * 3600 + minutes * 60), 

1369 unnecessary_negative_timezone, 

1370 ) 

1371 

1372 

1373def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1374 """Format a timezone for Git serialization. 

1375 

1376 Args: 

1377 offset: Timezone offset as seconds difference to UTC 

1378 unnecessary_negative_timezone: Whether to use a minus sign for 

1379 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1380 """ 

1381 if offset % 60 != 0: 

1382 raise ValueError("Unable to handle non-minute offset.") 

1383 if offset < 0 or unnecessary_negative_timezone: 

1384 sign = "-" 

1385 offset = -offset 

1386 else: 

1387 sign = "+" 

1388 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1389 

1390 

1391def parse_time_entry( 

1392 value: bytes, 

1393) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]: 

1394 """Parse event. 

1395 

1396 Args: 

1397 value: Bytes representing a git commit/tag line 

1398 Raises: 

1399 ObjectFormatException in case of parsing error (malformed 

1400 field date) 

1401 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1402 """ 

1403 try: 

1404 sep = value.rindex(b"> ") 

1405 except ValueError: 

1406 return (value, None, (None, False)) 

1407 try: 

1408 person = value[0 : sep + 1] 

1409 rest = value[sep + 2 :] 

1410 timetext, timezonetext = rest.rsplit(b" ", 1) 

1411 time = int(timetext) 

1412 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1413 except ValueError as exc: 

1414 raise ObjectFormatException(exc) from exc 

1415 return person, time, (timezone, timezone_neg_utc) 

1416 

1417 

1418def format_time_entry( 

1419 person: bytes, time: int, timezone_info: tuple[int, bool] 

1420) -> bytes: 

1421 """Format an event.""" 

1422 (timezone, timezone_neg_utc) = timezone_info 

1423 return b" ".join( 

1424 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1425 ) 

1426 

1427 

1428@replace_me(since="0.21.0", remove_in="0.24.0") 

1429def parse_commit( 

1430 chunks: Iterable[bytes], 

1431) -> tuple[ 

1432 Optional[bytes], 

1433 list[bytes], 

1434 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1435 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1436 Optional[bytes], 

1437 list[Tag], 

1438 Optional[bytes], 

1439 Optional[bytes], 

1440 list[tuple[bytes, bytes]], 

1441]: 

1442 """Parse a commit object from chunks. 

1443 

1444 Args: 

1445 chunks: Chunks to parse 

1446 Returns: Tuple of (tree, parents, author_info, commit_info, 

1447 encoding, mergetag, gpgsig, message, extra) 

1448 """ 

1449 parents = [] 

1450 extra = [] 

1451 tree = None 

1452 author_info: tuple[ 

1453 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1454 ] = (None, None, (None, None)) 

1455 commit_info: tuple[ 

1456 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1457 ] = (None, None, (None, None)) 

1458 encoding = None 

1459 mergetag = [] 

1460 message = None 

1461 gpgsig = None 

1462 

1463 for field, value in _parse_message(chunks): 

1464 # TODO(jelmer): Enforce ordering 

1465 if field == _TREE_HEADER: 

1466 tree = value 

1467 elif field == _PARENT_HEADER: 

1468 if value is None: 

1469 raise ObjectFormatException("missing parent value") 

1470 parents.append(value) 

1471 elif field == _AUTHOR_HEADER: 

1472 if value is None: 

1473 raise ObjectFormatException("missing author value") 

1474 author_info = parse_time_entry(value) 

1475 elif field == _COMMITTER_HEADER: 

1476 if value is None: 

1477 raise ObjectFormatException("missing committer value") 

1478 commit_info = parse_time_entry(value) 

1479 elif field == _ENCODING_HEADER: 

1480 encoding = value 

1481 elif field == _MERGETAG_HEADER: 

1482 if value is None: 

1483 raise ObjectFormatException("missing mergetag value") 

1484 tag = Tag.from_string(value + b"\n") 

1485 assert isinstance(tag, Tag) 

1486 mergetag.append(tag) 

1487 elif field == _GPGSIG_HEADER: 

1488 gpgsig = value 

1489 elif field is None: 

1490 message = value 

1491 else: 

1492 if value is None: 

1493 raise ObjectFormatException(f"missing value for field {field!r}") 

1494 extra.append((field, value)) 

1495 return ( 

1496 tree, 

1497 parents, 

1498 author_info, 

1499 commit_info, 

1500 encoding, 

1501 mergetag, 

1502 gpgsig, 

1503 message, 

1504 extra, 

1505 ) 

1506 

1507 

1508class Commit(ShaFile): 

1509 """A git commit object.""" 

1510 

1511 type_name = b"commit" 

1512 type_num = 1 

1513 

1514 __slots__ = ( 

1515 "_author", 

1516 "_author_time", 

1517 "_author_timezone", 

1518 "_author_timezone_neg_utc", 

1519 "_commit_time", 

1520 "_commit_timezone", 

1521 "_commit_timezone_neg_utc", 

1522 "_committer", 

1523 "_encoding", 

1524 "_extra", 

1525 "_gpgsig", 

1526 "_mergetag", 

1527 "_message", 

1528 "_parents", 

1529 "_tree", 

1530 ) 

1531 

1532 def __init__(self) -> None: 

1533 super().__init__() 

1534 self._parents: list[bytes] = [] 

1535 self._encoding: Optional[bytes] = None 

1536 self._mergetag: list[Tag] = [] 

1537 self._gpgsig: Optional[bytes] = None 

1538 self._extra: list[tuple[bytes, Optional[bytes]]] = [] 

1539 self._author_timezone_neg_utc: Optional[bool] = False 

1540 self._commit_timezone_neg_utc: Optional[bool] = False 

1541 

1542 @classmethod 

1543 def from_path(cls, path: Union[str, bytes]) -> "Commit": 

1544 commit = ShaFile.from_path(path) 

1545 if not isinstance(commit, cls): 

1546 raise NotCommitError(_path_to_bytes(path)) 

1547 return commit 

1548 

1549 def _deserialize(self, chunks: list[bytes]) -> None: 

1550 self._parents = [] 

1551 self._extra = [] 

1552 self._tree = None 

1553 author_info: tuple[ 

1554 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1555 ] = (None, None, (None, None)) 

1556 commit_info: tuple[ 

1557 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1558 ] = (None, None, (None, None)) 

1559 self._encoding = None 

1560 self._mergetag = [] 

1561 self._message = None 

1562 self._gpgsig = None 

1563 

1564 for field, value in _parse_message(chunks): 

1565 # TODO(jelmer): Enforce ordering 

1566 if field == _TREE_HEADER: 

1567 self._tree = value 

1568 elif field == _PARENT_HEADER: 

1569 assert value is not None 

1570 self._parents.append(value) 

1571 elif field == _AUTHOR_HEADER: 

1572 if value is None: 

1573 raise ObjectFormatException("missing author value") 

1574 author_info = parse_time_entry(value) 

1575 elif field == _COMMITTER_HEADER: 

1576 if value is None: 

1577 raise ObjectFormatException("missing committer value") 

1578 commit_info = parse_time_entry(value) 

1579 elif field == _ENCODING_HEADER: 

1580 self._encoding = value 

1581 elif field == _MERGETAG_HEADER: 

1582 assert value is not None 

1583 tag = Tag.from_string(value + b"\n") 

1584 assert isinstance(tag, Tag) 

1585 self._mergetag.append(tag) 

1586 elif field == _GPGSIG_HEADER: 

1587 self._gpgsig = value 

1588 elif field is None: 

1589 self._message = value 

1590 else: 

1591 self._extra.append((field, value)) 

1592 

1593 ( 

1594 self._author, 

1595 self._author_time, 

1596 (self._author_timezone, self._author_timezone_neg_utc), 

1597 ) = author_info 

1598 ( 

1599 self._committer, 

1600 self._commit_time, 

1601 (self._commit_timezone, self._commit_timezone_neg_utc), 

1602 ) = commit_info 

1603 

1604 def check(self) -> None: 

1605 """Check this object for internal consistency. 

1606 

1607 Raises: 

1608 ObjectFormatException: if the object is malformed in some way 

1609 """ 

1610 super().check() 

1611 assert self._chunked_text is not None 

1612 self._check_has_member("_tree", "missing tree") 

1613 self._check_has_member("_author", "missing author") 

1614 self._check_has_member("_committer", "missing committer") 

1615 self._check_has_member("_author_time", "missing author time") 

1616 self._check_has_member("_commit_time", "missing commit time") 

1617 

1618 for parent in self._parents: 

1619 check_hexsha(parent, "invalid parent sha") 

1620 assert self._tree is not None # checked by _check_has_member above 

1621 check_hexsha(self._tree, "invalid tree sha") 

1622 

1623 assert self._author is not None # checked by _check_has_member above 

1624 assert self._committer is not None # checked by _check_has_member above 

1625 check_identity(self._author, "invalid author") 

1626 check_identity(self._committer, "invalid committer") 

1627 

1628 assert self._author_time is not None # checked by _check_has_member above 

1629 assert self._commit_time is not None # checked by _check_has_member above 

1630 check_time(self._author_time) 

1631 check_time(self._commit_time) 

1632 

1633 last = None 

1634 for field, _ in _parse_message(self._chunked_text): 

1635 if field == _TREE_HEADER and last is not None: 

1636 raise ObjectFormatException("unexpected tree") 

1637 elif field == _PARENT_HEADER and last not in ( 

1638 _PARENT_HEADER, 

1639 _TREE_HEADER, 

1640 ): 

1641 raise ObjectFormatException("unexpected parent") 

1642 elif field == _AUTHOR_HEADER and last not in ( 

1643 _TREE_HEADER, 

1644 _PARENT_HEADER, 

1645 ): 

1646 raise ObjectFormatException("unexpected author") 

1647 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1648 raise ObjectFormatException("unexpected committer") 

1649 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1650 raise ObjectFormatException("unexpected encoding") 

1651 last = field 

1652 

1653 # TODO: optionally check for duplicate parents 

1654 

1655 def sign(self, keyid: Optional[str] = None) -> None: 

1656 import gpg 

1657 

1658 with gpg.Context(armor=True) as c: 

1659 if keyid is not None: 

1660 key = c.get_key(keyid) 

1661 with gpg.Context(armor=True, signers=[key]) as ctx: 

1662 self.gpgsig, unused_result = ctx.sign( 

1663 self.as_raw_string(), 

1664 mode=gpg.constants.sig.mode.DETACH, 

1665 ) 

1666 else: 

1667 self.gpgsig, unused_result = c.sign( 

1668 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1669 ) 

1670 

1671 def raw_without_sig(self) -> bytes: 

1672 """Return raw string serialization without the GPG/SSH signature. 

1673 

1674 self.gpgsig is a signature for the returned raw byte string serialization. 

1675 """ 

1676 tmp = self.copy() 

1677 assert isinstance(tmp, Commit) 

1678 tmp._gpgsig = None 

1679 tmp.gpgsig = None 

1680 return tmp.as_raw_string() 

1681 

1682 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1683 """Verify GPG signature for this commit (if it is signed). 

1684 

1685 Args: 

1686 keyids: Optional iterable of trusted keyids for this commit. 

1687 If this commit is not signed by any key in keyids verification will 

1688 fail. If not specified, this function only verifies that the commit 

1689 has a valid signature. 

1690 

1691 Raises: 

1692 gpg.errors.BadSignatures: if GPG signature verification fails 

1693 gpg.errors.MissingSignatures: if commit was not signed by a key 

1694 specified in keyids 

1695 """ 

1696 if self._gpgsig is None: 

1697 return 

1698 

1699 import gpg 

1700 

1701 with gpg.Context() as ctx: 

1702 data, result = ctx.verify( 

1703 self.raw_without_sig(), 

1704 signature=self._gpgsig, 

1705 ) 

1706 if keyids: 

1707 keys = [ctx.get_key(key) for key in keyids] 

1708 for key in keys: 

1709 for subkey in keys: 

1710 for sig in result.signatures: 

1711 if subkey.can_sign and subkey.fpr == sig.fpr: 

1712 return 

1713 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1714 

1715 def _serialize(self) -> list[bytes]: 

1716 headers = [] 

1717 assert self._tree is not None 

1718 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1719 headers.append((_TREE_HEADER, tree_bytes)) 

1720 for p in self._parents: 

1721 headers.append((_PARENT_HEADER, p)) 

1722 assert self._author is not None 

1723 assert self._author_time is not None 

1724 assert self._author_timezone is not None 

1725 assert self._author_timezone_neg_utc is not None 

1726 headers.append( 

1727 ( 

1728 _AUTHOR_HEADER, 

1729 format_time_entry( 

1730 self._author, 

1731 self._author_time, 

1732 (self._author_timezone, self._author_timezone_neg_utc), 

1733 ), 

1734 ) 

1735 ) 

1736 assert self._committer is not None 

1737 assert self._commit_time is not None 

1738 assert self._commit_timezone is not None 

1739 assert self._commit_timezone_neg_utc is not None 

1740 headers.append( 

1741 ( 

1742 _COMMITTER_HEADER, 

1743 format_time_entry( 

1744 self._committer, 

1745 self._commit_time, 

1746 (self._commit_timezone, self._commit_timezone_neg_utc), 

1747 ), 

1748 ) 

1749 ) 

1750 if self.encoding: 

1751 headers.append((_ENCODING_HEADER, self.encoding)) 

1752 for mergetag in self.mergetag: 

1753 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

1754 headers.extend( 

1755 (field, value) for field, value in self._extra if value is not None 

1756 ) 

1757 if self.gpgsig: 

1758 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

1759 return list(_format_message(headers, self._message)) 

1760 

1761 tree = serializable_property("tree", "Tree that is the state of this commit") 

1762 

1763 def _get_parents(self) -> list[bytes]: 

1764 """Return a list of parents of this commit.""" 

1765 return self._parents 

1766 

1767 def _set_parents(self, value: list[bytes]) -> None: 

1768 """Set a list of parents of this commit.""" 

1769 self._needs_serialization = True 

1770 self._parents = value 

1771 

1772 parents = property( 

1773 _get_parents, 

1774 _set_parents, 

1775 doc="Parents of this commit, by their SHA1.", 

1776 ) 

1777 

1778 @replace_me(since="0.21.0", remove_in="0.24.0") 

1779 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]: 

1780 """Return extra settings of this commit.""" 

1781 return self._extra 

1782 

1783 extra = property( 

1784 _get_extra, 

1785 doc="Extra header fields not understood (presumably added in a " 

1786 "newer version of git). Kept verbatim so the object can " 

1787 "be correctly reserialized. For private commit metadata, use " 

1788 "pseudo-headers in Commit.message, rather than this field.", 

1789 ) 

1790 

1791 author = serializable_property("author", "The name of the author of the commit") 

1792 

1793 committer = serializable_property( 

1794 "committer", "The name of the committer of the commit" 

1795 ) 

1796 

1797 message = serializable_property("message", "The commit message") 

1798 

1799 commit_time = serializable_property( 

1800 "commit_time", 

1801 "The timestamp of the commit. As the number of seconds since the epoch.", 

1802 ) 

1803 

1804 commit_timezone = serializable_property( 

1805 "commit_timezone", "The zone the commit time is in" 

1806 ) 

1807 

1808 author_time = serializable_property( 

1809 "author_time", 

1810 "The timestamp the commit was written. As the number of " 

1811 "seconds since the epoch.", 

1812 ) 

1813 

1814 author_timezone = serializable_property( 

1815 "author_timezone", "Returns the zone the author time is in." 

1816 ) 

1817 

1818 encoding = serializable_property("encoding", "Encoding of the commit message.") 

1819 

1820 mergetag = serializable_property("mergetag", "Associated signed tag.") 

1821 

1822 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

1823 

1824 

1825OBJECT_CLASSES = ( 

1826 Commit, 

1827 Tree, 

1828 Blob, 

1829 Tag, 

1830) 

1831 

1832_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {} 

1833 

1834for cls in OBJECT_CLASSES: 

1835 _TYPE_MAP[cls.type_name] = cls 

1836 _TYPE_MAP[cls.type_num] = cls 

1837 

1838 

1839# Hold on to the pure-python implementations for testing 

1840_parse_tree_py = parse_tree 

1841_sorted_tree_items_py = sorted_tree_items 

1842try: 

1843 # Try to import Rust versions 

1844 from dulwich._objects import ( 

1845 parse_tree as _parse_tree_rs, 

1846 ) 

1847 from dulwich._objects import ( 

1848 sorted_tree_items as _sorted_tree_items_rs, 

1849 ) 

1850except ImportError: 

1851 pass 

1852else: 

1853 parse_tree = _parse_tree_rs 

1854 sorted_tree_items = _sorted_tree_items_rs