Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

956 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 

6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

7# General Public License as public by the Free Software Foundation; version 2.0 

8# or (at your option) any later version. You can redistribute it and/or 

9# modify it under the terms of either of these two licenses. 

10# 

11# Unless required by applicable law or agreed to in writing, software 

12# distributed under the License is distributed on an "AS IS" BASIS, 

13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16# 

17# You should have received a copy of the licenses; if not, see 

18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

20# License, Version 2.0. 

21# 

22 

23"""Access to base git objects.""" 

24 

25import binascii 

26import os 

27import posixpath 

28import stat 

29import zlib 

30from collections import namedtuple 

31from collections.abc import Callable, Iterable, Iterator 

32from hashlib import sha1 

33from io import BufferedIOBase, BytesIO 

34from typing import ( 

35 IO, 

36 TYPE_CHECKING, 

37 Optional, 

38 Union, 

39) 

40 

41try: 

42 from typing import TypeGuard # type: ignore 

43except ImportError: 

44 from typing_extensions import TypeGuard 

45 

46from . import replace_me 

47from .errors import ( 

48 ChecksumMismatch, 

49 FileFormatException, 

50 NotBlobError, 

51 NotCommitError, 

52 NotTagError, 

53 NotTreeError, 

54 ObjectFormatException, 

55) 

56from .file import GitFile 

57 

58if TYPE_CHECKING: 

59 from _hashlib import HASH 

60 

61 from .file import _GitFile 

62 

63ZERO_SHA = b"0" * 40 

64 

65# Header fields for commits 

66_TREE_HEADER = b"tree" 

67_PARENT_HEADER = b"parent" 

68_AUTHOR_HEADER = b"author" 

69_COMMITTER_HEADER = b"committer" 

70_ENCODING_HEADER = b"encoding" 

71_MERGETAG_HEADER = b"mergetag" 

72_GPGSIG_HEADER = b"gpgsig" 

73 

74# Header fields for objects 

75_OBJECT_HEADER = b"object" 

76_TYPE_HEADER = b"type" 

77_TAG_HEADER = b"tag" 

78_TAGGER_HEADER = b"tagger" 

79 

80 

81S_IFGITLINK = 0o160000 

82 

83 

84MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

85 

86BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

87 

88 

89ObjectID = bytes 

90 

91 

92class EmptyFileException(FileFormatException): 

93 """An unexpectedly empty file was encountered.""" 

94 

95 

96def S_ISGITLINK(m: int) -> bool: 

97 """Check if a mode indicates a submodule. 

98 

99 Args: 

100 m: Mode to check 

101 Returns: a ``boolean`` 

102 """ 

103 return stat.S_IFMT(m) == S_IFGITLINK 

104 

105 

106def _decompress(string: bytes) -> bytes: 

107 dcomp = zlib.decompressobj() 

108 dcomped = dcomp.decompress(string) 

109 dcomped += dcomp.flush() 

110 return dcomped 

111 

112 

113def sha_to_hex(sha: ObjectID) -> bytes: 

114 """Takes a string and returns the hex of the sha within.""" 

115 hexsha = binascii.hexlify(sha) 

116 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

117 return hexsha 

118 

119 

120def hex_to_sha(hex: Union[bytes, str]) -> bytes: 

121 """Takes a hex sha and returns a binary sha.""" 

122 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}" 

123 try: 

124 return binascii.unhexlify(hex) 

125 except TypeError as exc: 

126 if not isinstance(hex, bytes): 

127 raise 

128 raise ValueError(exc.args[0]) from exc 

129 

130 

131def valid_hexsha(hex: Union[bytes, str]) -> bool: 

132 if len(hex) != 40: 

133 return False 

134 try: 

135 binascii.unhexlify(hex) 

136 except (TypeError, binascii.Error): 

137 return False 

138 else: 

139 return True 

140 

141 

142def hex_to_filename( 

143 path: Union[str, bytes], hex: Union[str, bytes] 

144) -> Union[str, bytes]: 

145 """Takes a hex sha and returns its filename relative to the given path.""" 

146 # os.path.join accepts bytes or unicode, but all args must be of the same 

147 # type. Make sure that hex which is expected to be bytes, is the same type 

148 # as path. 

149 if type(path) is not type(hex) and getattr(path, "encode", None) is not None: 

150 hex = hex.decode("ascii") # type: ignore 

151 dir = hex[:2] 

152 file = hex[2:] 

153 # Check from object dir 

154 return os.path.join(path, dir, file) # type: ignore 

155 

156 

157def filename_to_hex(filename: Union[str, bytes]) -> str: 

158 """Takes an object filename and returns its corresponding hex sha.""" 

159 # grab the last (up to) two path components 

160 names = filename.rsplit(os.path.sep, 2)[-2:] # type: ignore 

161 errmsg = f"Invalid object filename: {filename!r}" 

162 assert len(names) == 2, errmsg 

163 base, rest = names 

164 assert len(base) == 2 and len(rest) == 38, errmsg 

165 hex_bytes = (base + rest).encode("ascii") # type: ignore 

166 hex_to_sha(hex_bytes) 

167 return hex_bytes.decode("ascii") 

168 

169 

170def object_header(num_type: int, length: int) -> bytes: 

171 """Return an object header for the given numeric type and text length.""" 

172 cls = object_class(num_type) 

173 if cls is None: 

174 raise AssertionError(f"unsupported class type num: {num_type}") 

175 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

176 

177 

178def serializable_property(name: str, docstring: Optional[str] = None) -> property: 

179 """A property that helps tracking whether serialization is necessary.""" 

180 

181 def set(obj: "ShaFile", value: object) -> None: 

182 setattr(obj, "_" + name, value) 

183 obj._needs_serialization = True 

184 

185 def get(obj: "ShaFile") -> object: 

186 return getattr(obj, "_" + name) 

187 

188 return property(get, set, doc=docstring) 

189 

190 

191def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]: 

192 """Get the object class corresponding to the given type. 

193 

194 Args: 

195 type: Either a type name string or a numeric type. 

196 Returns: The ShaFile subclass corresponding to the given type, or None if 

197 type is not a valid type name/number. 

198 """ 

199 return _TYPE_MAP.get(type, None) 

200 

201 

202def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None: 

203 """Check if a string is a valid hex sha string. 

204 

205 Args: 

206 hex: Hex string to check 

207 error_msg: Error message to use in exception 

208 Raises: 

209 ObjectFormatException: Raised when the string is not valid 

210 """ 

211 if not valid_hexsha(hex): 

212 raise ObjectFormatException(f"{error_msg} {hex!r}") 

213 

214 

215def check_identity(identity: Optional[bytes], error_msg: str) -> None: 

216 """Check if the specified identity is valid. 

217 

218 This will raise an exception if the identity is not valid. 

219 

220 Args: 

221 identity: Identity string 

222 error_msg: Error message to use in exception 

223 """ 

224 if identity is None: 

225 raise ObjectFormatException(error_msg) 

226 email_start = identity.find(b"<") 

227 email_end = identity.find(b">") 

228 if not all( 

229 [ 

230 email_start >= 1, 

231 identity[email_start - 1] == b" "[0], 

232 identity.find(b"<", email_start + 1) == -1, 

233 email_end == len(identity) - 1, 

234 b"\0" not in identity, 

235 b"\n" not in identity, 

236 ] 

237 ): 

238 raise ObjectFormatException(error_msg) 

239 

240 

241def check_time(time_seconds: int) -> None: 

242 """Check if the specified time is not prone to overflow error. 

243 

244 This will raise an exception if the time is not valid. 

245 

246 Args: 

247 time_seconds: time in seconds 

248 

249 """ 

250 # Prevent overflow error 

251 if time_seconds > MAX_TIME: 

252 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

253 

254 

255def git_line(*items: bytes) -> bytes: 

256 """Formats items into a space separated line.""" 

257 return b" ".join(items) + b"\n" 

258 

259 

260class FixedSha: 

261 """SHA object that behaves like hashlib's but is given a fixed value.""" 

262 

263 __slots__ = ("_hexsha", "_sha") 

264 

265 def __init__(self, hexsha: Union[str, bytes]) -> None: 

266 if getattr(hexsha, "encode", None) is not None: 

267 hexsha = hexsha.encode("ascii") # type: ignore 

268 if not isinstance(hexsha, bytes): 

269 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

270 self._hexsha = hexsha 

271 self._sha = hex_to_sha(hexsha) 

272 

273 def digest(self) -> bytes: 

274 """Return the raw SHA digest.""" 

275 return self._sha 

276 

277 def hexdigest(self) -> str: 

278 """Return the hex SHA digest.""" 

279 return self._hexsha.decode("ascii") 

280 

281 

282# Type guard functions for runtime type narrowing 

283if TYPE_CHECKING: 

284 

285 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]: 

286 """Check if a ShaFile is a Commit.""" 

287 return obj.type_name == b"commit" 

288 

289 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]: 

290 """Check if a ShaFile is a Tree.""" 

291 return obj.type_name == b"tree" 

292 

293 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]: 

294 """Check if a ShaFile is a Blob.""" 

295 return obj.type_name == b"blob" 

296 

297 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]: 

298 """Check if a ShaFile is a Tag.""" 

299 return obj.type_name == b"tag" 

300else: 

301 # Runtime versions without type narrowing 

302 def is_commit(obj: "ShaFile") -> bool: 

303 """Check if a ShaFile is a Commit.""" 

304 return obj.type_name == b"commit" 

305 

306 def is_tree(obj: "ShaFile") -> bool: 

307 """Check if a ShaFile is a Tree.""" 

308 return obj.type_name == b"tree" 

309 

310 def is_blob(obj: "ShaFile") -> bool: 

311 """Check if a ShaFile is a Blob.""" 

312 return obj.type_name == b"blob" 

313 

314 def is_tag(obj: "ShaFile") -> bool: 

315 """Check if a ShaFile is a Tag.""" 

316 return obj.type_name == b"tag" 

317 

318 

319class ShaFile: 

320 """A git SHA file.""" 

321 

322 __slots__ = ("_chunked_text", "_needs_serialization", "_sha") 

323 

324 _needs_serialization: bool 

325 type_name: bytes 

326 type_num: int 

327 _chunked_text: Optional[list[bytes]] 

328 _sha: Union[FixedSha, None, "HASH"] 

329 

330 @staticmethod 

331 def _parse_legacy_object_header( 

332 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

333 ) -> "ShaFile": 

334 """Parse a legacy object, creating it but not reading the file.""" 

335 bufsize = 1024 

336 decomp = zlib.decompressobj() 

337 header = decomp.decompress(magic) 

338 start = 0 

339 end = -1 

340 while end < 0: 

341 extra = f.read(bufsize) 

342 header += decomp.decompress(extra) 

343 magic += extra 

344 end = header.find(b"\0", start) 

345 start = len(header) 

346 header = header[:end] 

347 type_name, size = header.split(b" ", 1) 

348 try: 

349 int(size) # sanity check 

350 except ValueError as exc: 

351 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

352 obj_class = object_class(type_name) 

353 if not obj_class: 

354 raise ObjectFormatException( 

355 "Not a known type: {}".format(type_name.decode("ascii")) 

356 ) 

357 return obj_class() 

358 

359 def _parse_legacy_object(self, map: bytes) -> None: 

360 """Parse a legacy object, setting the raw string.""" 

361 text = _decompress(map) 

362 header_end = text.find(b"\0") 

363 if header_end < 0: 

364 raise ObjectFormatException("Invalid object header, no \\0") 

365 self.set_raw_string(text[header_end + 1 :]) 

366 

367 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

368 """Return chunks representing the object in the experimental format. 

369 

370 Returns: List of strings 

371 """ 

372 compobj = zlib.compressobj(compression_level) 

373 yield compobj.compress(self._header()) 

374 for chunk in self.as_raw_chunks(): 

375 yield compobj.compress(chunk) 

376 yield compobj.flush() 

377 

378 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

379 """Return string representing the object in the experimental format.""" 

380 return b"".join( 

381 self.as_legacy_object_chunks(compression_level=compression_level) 

382 ) 

383 

384 def as_raw_chunks(self) -> list[bytes]: 

385 """Return chunks with serialization of the object. 

386 

387 Returns: List of strings, not necessarily one per line 

388 """ 

389 if self._needs_serialization: 

390 self._sha = None 

391 self._chunked_text = self._serialize() 

392 self._needs_serialization = False 

393 return self._chunked_text # type: ignore 

394 

395 def as_raw_string(self) -> bytes: 

396 """Return raw string with serialization of the object. 

397 

398 Returns: String object 

399 """ 

400 return b"".join(self.as_raw_chunks()) 

401 

402 def __bytes__(self) -> bytes: 

403 """Return raw string serialization of this object.""" 

404 return self.as_raw_string() 

405 

406 def __hash__(self) -> int: 

407 """Return unique hash for this object.""" 

408 return hash(self.id) 

409 

410 def as_pretty_string(self) -> str: 

411 """Return a string representing this object, fit for display.""" 

412 return self.as_raw_string().decode("utf-8", "replace") 

413 

414 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None: 

415 """Set the contents of this object from a serialized string.""" 

416 if not isinstance(text, bytes): 

417 raise TypeError(f"Expected bytes for text, got {text!r}") 

418 self.set_raw_chunks([text], sha) 

419 

420 def set_raw_chunks( 

421 self, chunks: list[bytes], sha: Optional[ObjectID] = None 

422 ) -> None: 

423 """Set the contents of this object from a list of chunks.""" 

424 self._chunked_text = chunks 

425 self._deserialize(chunks) 

426 if sha is None: 

427 self._sha = None 

428 else: 

429 self._sha = FixedSha(sha) # type: ignore 

430 self._needs_serialization = False 

431 

432 @staticmethod 

433 def _parse_object_header( 

434 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"] 

435 ) -> "ShaFile": 

436 """Parse a new style object, creating it but not reading the file.""" 

437 num_type = (ord(magic[0:1]) >> 4) & 7 

438 obj_class = object_class(num_type) 

439 if not obj_class: 

440 raise ObjectFormatException(f"Not a known type {num_type}") 

441 return obj_class() 

442 

443 def _parse_object(self, map: bytes) -> None: 

444 """Parse a new style object, setting self._text.""" 

445 # skip type and size; type must have already been determined, and 

446 # we trust zlib to fail if it's otherwise corrupted 

447 byte = ord(map[0:1]) 

448 used = 1 

449 while (byte & 0x80) != 0: 

450 byte = ord(map[used : used + 1]) 

451 used += 1 

452 raw = map[used:] 

453 self.set_raw_string(_decompress(raw)) 

454 

455 @classmethod 

456 def _is_legacy_object(cls, magic: bytes) -> bool: 

457 b0 = ord(magic[0:1]) 

458 b1 = ord(magic[1:2]) 

459 word = (b0 << 8) + b1 

460 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

461 

462 @classmethod 

463 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

464 map = f.read() 

465 if not map: 

466 raise EmptyFileException("Corrupted empty file detected") 

467 

468 if cls._is_legacy_object(map): 

469 obj = cls._parse_legacy_object_header(map, f) 

470 obj._parse_legacy_object(map) 

471 else: 

472 obj = cls._parse_object_header(map, f) 

473 obj._parse_object(map) 

474 return obj 

475 

476 def __init__(self) -> None: 

477 """Don't call this directly.""" 

478 self._sha = None 

479 self._chunked_text = [] 

480 self._needs_serialization = True 

481 

482 def _deserialize(self, chunks: list[bytes]) -> None: 

483 raise NotImplementedError(self._deserialize) 

484 

485 def _serialize(self) -> list[bytes]: 

486 raise NotImplementedError(self._serialize) 

487 

488 @classmethod 

489 def from_path(cls, path: Union[str, bytes]) -> "ShaFile": 

490 """Open a SHA file from disk.""" 

491 with GitFile(path, "rb") as f: 

492 return cls.from_file(f) 

493 

494 @classmethod 

495 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile": 

496 """Get the contents of a SHA file on disk.""" 

497 try: 

498 obj = cls._parse_file(f) 

499 obj._sha = None 

500 return obj 

501 except (IndexError, ValueError) as exc: 

502 raise ObjectFormatException("invalid object header") from exc 

503 

504 @staticmethod 

505 def from_raw_string( 

506 type_num: int, string: bytes, sha: Optional[ObjectID] = None 

507 ) -> "ShaFile": 

508 """Creates an object of the indicated type from the raw string given. 

509 

510 Args: 

511 type_num: The numeric type of the object. 

512 string: The raw uncompressed contents. 

513 sha: Optional known sha for the object 

514 """ 

515 cls = object_class(type_num) 

516 if cls is None: 

517 raise AssertionError(f"unsupported class type num: {type_num}") 

518 obj = cls() 

519 obj.set_raw_string(string, sha) 

520 return obj 

521 

522 @staticmethod 

523 def from_raw_chunks( 

524 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None 

525 ) -> "ShaFile": 

526 """Creates an object of the indicated type from the raw chunks given. 

527 

528 Args: 

529 type_num: The numeric type of the object. 

530 chunks: An iterable of the raw uncompressed contents. 

531 sha: Optional known sha for the object 

532 """ 

533 cls = object_class(type_num) 

534 if cls is None: 

535 raise AssertionError(f"unsupported class type num: {type_num}") 

536 obj = cls() 

537 obj.set_raw_chunks(chunks, sha) 

538 return obj 

539 

540 @classmethod 

541 def from_string(cls, string: bytes) -> "ShaFile": 

542 """Create a ShaFile from a string.""" 

543 obj = cls() 

544 obj.set_raw_string(string) 

545 return obj 

546 

547 def _check_has_member(self, member: str, error_msg: str) -> None: 

548 """Check that the object has a given member variable. 

549 

550 Args: 

551 member: the member variable to check for 

552 error_msg: the message for an error if the member is missing 

553 Raises: 

554 ObjectFormatException: with the given error_msg if member is 

555 missing or is None 

556 """ 

557 if getattr(self, member, None) is None: 

558 raise ObjectFormatException(error_msg) 

559 

560 def check(self) -> None: 

561 """Check this object for internal consistency. 

562 

563 Raises: 

564 ObjectFormatException: if the object is malformed in some way 

565 ChecksumMismatch: if the object was created with a SHA that does 

566 not match its contents 

567 """ 

568 # TODO: if we find that error-checking during object parsing is a 

569 # performance bottleneck, those checks should be moved to the class's 

570 # check() method during optimization so we can still check the object 

571 # when necessary. 

572 old_sha = self.id 

573 try: 

574 self._deserialize(self.as_raw_chunks()) 

575 self._sha = None 

576 new_sha = self.id 

577 except Exception as exc: 

578 raise ObjectFormatException(exc) from exc 

579 if old_sha != new_sha: 

580 raise ChecksumMismatch(new_sha, old_sha) 

581 

582 def _header(self) -> bytes: 

583 return object_header(self.type_num, self.raw_length()) 

584 

585 def raw_length(self) -> int: 

586 """Returns the length of the raw string of this object.""" 

587 return sum(map(len, self.as_raw_chunks())) 

588 

589 def sha(self) -> Union[FixedSha, "HASH"]: 

590 """The SHA1 object that is the name of this object.""" 

591 if self._sha is None or self._needs_serialization: 

592 # this is a local because as_raw_chunks() overwrites self._sha 

593 new_sha = sha1() 

594 new_sha.update(self._header()) 

595 for chunk in self.as_raw_chunks(): 

596 new_sha.update(chunk) 

597 self._sha = new_sha 

598 return self._sha 

599 

600 def copy(self) -> "ShaFile": 

601 """Create a new copy of this SHA1 object from its raw string.""" 

602 obj_class = object_class(self.type_num) 

603 if obj_class is None: 

604 raise AssertionError(f"invalid type num {self.type_num}") 

605 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

606 

607 @property 

608 def id(self) -> bytes: 

609 """The hex SHA of this object.""" 

610 return self.sha().hexdigest().encode("ascii") 

611 

612 def __repr__(self) -> str: 

613 return f"<{self.__class__.__name__} {self.id!r}>" 

614 

615 def __ne__(self, other: object) -> bool: 

616 """Check whether this object does not match the other.""" 

617 return not isinstance(other, ShaFile) or self.id != other.id 

618 

619 def __eq__(self, other: object) -> bool: 

620 """Return True if the SHAs of the two objects match.""" 

621 return isinstance(other, ShaFile) and self.id == other.id 

622 

623 def __lt__(self, other: object) -> bool: 

624 """Return whether SHA of this object is less than the other.""" 

625 if not isinstance(other, ShaFile): 

626 raise TypeError 

627 return self.id < other.id 

628 

629 def __le__(self, other: object) -> bool: 

630 """Check whether SHA of this object is less than or equal to the other.""" 

631 if not isinstance(other, ShaFile): 

632 raise TypeError 

633 return self.id <= other.id 

634 

635 

636class Blob(ShaFile): 

637 """A Git Blob object.""" 

638 

639 __slots__ = () 

640 

641 type_name = b"blob" 

642 type_num = 3 

643 

644 _chunked_text: list[bytes] 

645 

646 def __init__(self) -> None: 

647 super().__init__() 

648 self._chunked_text = [] 

649 self._needs_serialization = False 

650 

651 def _get_data(self) -> bytes: 

652 return self.as_raw_string() 

653 

654 def _set_data(self, data: bytes) -> None: 

655 self.set_raw_string(data) 

656 

657 data = property( 

658 _get_data, _set_data, doc="The text contained within the blob object." 

659 ) 

660 

661 def _get_chunked(self) -> list[bytes]: 

662 return self._chunked_text 

663 

664 def _set_chunked(self, chunks: list[bytes]) -> None: 

665 self._chunked_text = chunks 

666 

667 def _serialize(self) -> list[bytes]: 

668 return self._chunked_text 

669 

670 def _deserialize(self, chunks: list[bytes]) -> None: 

671 self._chunked_text = chunks 

672 

673 chunked = property( 

674 _get_chunked, 

675 _set_chunked, 

676 doc="The text in the blob object, as chunks (not necessarily lines)", 

677 ) 

678 

679 @classmethod 

680 def from_path(cls, path: Union[str, bytes]) -> "Blob": 

681 blob = ShaFile.from_path(path) 

682 if not isinstance(blob, cls): 

683 raise NotBlobError(path) 

684 return blob 

685 

686 def check(self) -> None: 

687 """Check this object for internal consistency. 

688 

689 Raises: 

690 ObjectFormatException: if the object is malformed in some way 

691 """ 

692 super().check() 

693 

694 def splitlines(self) -> list[bytes]: 

695 """Return list of lines in this blob. 

696 

697 This preserves the original line endings. 

698 """ 

699 chunks = self.chunked 

700 if not chunks: 

701 return [] 

702 if len(chunks) == 1: 

703 return chunks[0].splitlines(True) 

704 remaining = None 

705 ret = [] 

706 for chunk in chunks: 

707 lines = chunk.splitlines(True) 

708 if len(lines) > 1: 

709 ret.append((remaining or b"") + lines[0]) 

710 ret.extend(lines[1:-1]) 

711 remaining = lines[-1] 

712 elif len(lines) == 1: 

713 if remaining is None: 

714 remaining = lines.pop() 

715 else: 

716 remaining += lines.pop() 

717 if remaining is not None: 

718 ret.append(remaining) 

719 return ret 

720 

721 

722def _parse_message( 

723 chunks: Iterable[bytes], 

724) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]: 

725 """Parse a message with a list of fields and a body. 

726 

727 Args: 

728 chunks: the raw chunks of the tag or commit object. 

729 Returns: iterator of tuples of (field, value), one per header line, in the 

730 order read from the text, possibly including duplicates. Includes a 

731 field named None for the freeform tag/commit text. 

732 """ 

733 f = BytesIO(b"".join(chunks)) 

734 k = None 

735 v = b"" 

736 eof = False 

737 

738 def _strip_last_newline(value: bytes) -> bytes: 

739 """Strip the last newline from value.""" 

740 if value and value.endswith(b"\n"): 

741 return value[:-1] 

742 return value 

743 

744 # Parse the headers 

745 # 

746 # Headers can contain newlines. The next line is indented with a space. 

747 # We store the latest key as 'k', and the accumulated value as 'v'. 

748 for line in f: 

749 if line.startswith(b" "): 

750 # Indented continuation of the previous line 

751 v += line[1:] 

752 else: 

753 if k is not None: 

754 # We parsed a new header, return its value 

755 yield (k, _strip_last_newline(v)) 

756 if line == b"\n": 

757 # Empty line indicates end of headers 

758 break 

759 (k, v) = line.split(b" ", 1) 

760 

761 else: 

762 # We reached end of file before the headers ended. We still need to 

763 # return the previous header, then we need to return a None field for 

764 # the text. 

765 eof = True 

766 if k is not None: 

767 yield (k, _strip_last_newline(v)) 

768 yield (None, None) 

769 

770 if not eof: 

771 # We didn't reach the end of file while parsing headers. We can return 

772 # the rest of the file as a message. 

773 yield (None, f.read()) 

774 

775 f.close() 

776 

777 

778def _format_message( 

779 headers: list[tuple[bytes, bytes]], body: Optional[bytes] 

780) -> Iterator[bytes]: 

781 for field, value in headers: 

782 lines = value.split(b"\n") 

783 yield git_line(field, lines[0]) 

784 for line in lines[1:]: 

785 yield b" " + line + b"\n" 

786 yield b"\n" # There must be a new line after the headers 

787 if body: 

788 yield body 

789 

790 

791class Tag(ShaFile): 

792 """A Git Tag object.""" 

793 

794 type_name = b"tag" 

795 type_num = 4 

796 

797 __slots__ = ( 

798 "_message", 

799 "_name", 

800 "_object_class", 

801 "_object_sha", 

802 "_signature", 

803 "_tag_time", 

804 "_tag_timezone", 

805 "_tag_timezone_neg_utc", 

806 "_tagger", 

807 ) 

808 

809 _message: Optional[bytes] 

810 _name: Optional[bytes] 

811 _object_class: Optional[type["ShaFile"]] 

812 _object_sha: Optional[bytes] 

813 _signature: Optional[bytes] 

814 _tag_time: Optional[int] 

815 _tag_timezone: Optional[int] 

816 _tag_timezone_neg_utc: Optional[bool] 

817 _tagger: Optional[bytes] 

818 

819 def __init__(self) -> None: 

820 super().__init__() 

821 self._tagger = None 

822 self._tag_time = None 

823 self._tag_timezone = None 

824 self._tag_timezone_neg_utc = False 

825 self._signature: Optional[bytes] = None 

826 

827 @classmethod 

828 def from_path(cls, filename: Union[str, bytes]) -> "Tag": 

829 tag = ShaFile.from_path(filename) 

830 if not isinstance(tag, cls): 

831 raise NotTagError(filename) 

832 return tag 

833 

834 def check(self) -> None: 

835 """Check this object for internal consistency. 

836 

837 Raises: 

838 ObjectFormatException: if the object is malformed in some way 

839 """ 

840 super().check() 

841 assert self._chunked_text is not None 

842 self._check_has_member("_object_sha", "missing object sha") 

843 self._check_has_member("_object_class", "missing object type") 

844 self._check_has_member("_name", "missing tag name") 

845 

846 if not self._name: 

847 raise ObjectFormatException("empty tag name") 

848 

849 if self._object_sha is None: 

850 raise ObjectFormatException("missing object sha") 

851 check_hexsha(self._object_sha, "invalid object sha") 

852 

853 if self._tagger is not None: 

854 check_identity(self._tagger, "invalid tagger") 

855 

856 self._check_has_member("_tag_time", "missing tag time") 

857 if self._tag_time is None: 

858 raise ObjectFormatException("missing tag time") 

859 check_time(self._tag_time) 

860 

861 last = None 

862 for field, _ in _parse_message(self._chunked_text): 

863 if field == _OBJECT_HEADER and last is not None: 

864 raise ObjectFormatException("unexpected object") 

865 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

866 raise ObjectFormatException("unexpected type") 

867 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

868 raise ObjectFormatException("unexpected tag name") 

869 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

870 raise ObjectFormatException("unexpected tagger") 

871 last = field 

872 

873 def _serialize(self) -> list[bytes]: 

874 headers = [] 

875 if self._object_sha is None: 

876 raise ObjectFormatException("missing object sha") 

877 headers.append((_OBJECT_HEADER, self._object_sha)) 

878 if self._object_class is None: 

879 raise ObjectFormatException("missing object class") 

880 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

881 if self._name is None: 

882 raise ObjectFormatException("missing tag name") 

883 headers.append((_TAG_HEADER, self._name)) 

884 if self._tagger: 

885 if self._tag_time is None: 

886 headers.append((_TAGGER_HEADER, self._tagger)) 

887 else: 

888 if self._tag_timezone is None or self._tag_timezone_neg_utc is None: 

889 raise ObjectFormatException("missing timezone info") 

890 headers.append( 

891 ( 

892 _TAGGER_HEADER, 

893 format_time_entry( 

894 self._tagger, 

895 self._tag_time, 

896 (self._tag_timezone, self._tag_timezone_neg_utc), 

897 ), 

898 ) 

899 ) 

900 

901 if self.message is None and self._signature is None: 

902 body = None 

903 else: 

904 body = (self.message or b"") + (self._signature or b"") 

905 return list(_format_message(headers, body)) 

906 

907 def _deserialize(self, chunks: list[bytes]) -> None: 

908 """Grab the metadata attached to the tag.""" 

909 self._tagger = None 

910 self._tag_time = None 

911 self._tag_timezone = None 

912 self._tag_timezone_neg_utc = False 

913 for field, value in _parse_message(chunks): 

914 if field == _OBJECT_HEADER: 

915 self._object_sha = value 

916 elif field == _TYPE_HEADER: 

917 assert isinstance(value, bytes) 

918 obj_class = object_class(value) 

919 if not obj_class: 

920 raise ObjectFormatException(f"Not a known type: {value!r}") 

921 self._object_class = obj_class 

922 elif field == _TAG_HEADER: 

923 self._name = value 

924 elif field == _TAGGER_HEADER: 

925 if value is None: 

926 raise ObjectFormatException("missing tagger value") 

927 ( 

928 self._tagger, 

929 self._tag_time, 

930 (self._tag_timezone, self._tag_timezone_neg_utc), 

931 ) = parse_time_entry(value) 

932 elif field is None: 

933 if value is None: 

934 self._message = None 

935 self._signature = None 

936 else: 

937 try: 

938 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

939 except ValueError: 

940 self._message = value 

941 self._signature = None 

942 else: 

943 self._message = value[:sig_idx] 

944 self._signature = value[sig_idx:] 

945 else: 

946 raise ObjectFormatException( 

947 f"Unknown field {field.decode('ascii', 'replace')}" 

948 ) 

949 

950 def _get_object(self) -> tuple[type[ShaFile], bytes]: 

951 """Get the object pointed to by this tag. 

952 

953 Returns: tuple of (object class, sha). 

954 """ 

955 if self._object_class is None or self._object_sha is None: 

956 raise ValueError("Tag object is not properly initialized") 

957 return (self._object_class, self._object_sha) 

958 

959 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None: 

960 (self._object_class, self._object_sha) = value 

961 self._needs_serialization = True 

962 

963 object = property(_get_object, _set_object) 

964 

965 name = serializable_property("name", "The name of this tag") 

966 tagger = serializable_property( 

967 "tagger", "Returns the name of the person who created this tag" 

968 ) 

969 tag_time = serializable_property( 

970 "tag_time", 

971 "The creation timestamp of the tag. As the number of seconds since the epoch", 

972 ) 

973 tag_timezone = serializable_property( 

974 "tag_timezone", "The timezone that tag_time is in." 

975 ) 

976 message = serializable_property("message", "the message attached to this tag") 

977 

978 signature = serializable_property("signature", "Optional detached GPG signature") 

979 

980 def sign(self, keyid: Optional[str] = None) -> None: 

981 import gpg 

982 

983 with gpg.Context(armor=True) as c: 

984 if keyid is not None: 

985 key = c.get_key(keyid) 

986 with gpg.Context(armor=True, signers=[key]) as ctx: 

987 self.signature, unused_result = ctx.sign( 

988 self.as_raw_string(), 

989 mode=gpg.constants.sig.mode.DETACH, 

990 ) 

991 else: 

992 self.signature, unused_result = c.sign( 

993 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

994 ) 

995 

996 def raw_without_sig(self) -> bytes: 

997 """Return raw string serialization without the GPG/SSH signature. 

998 

999 self.signature is a signature for the returned raw byte string serialization. 

1000 """ 

1001 ret = self.as_raw_string() 

1002 if self._signature: 

1003 ret = ret[: -len(self._signature)] 

1004 return ret 

1005 

1006 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1007 """Verify GPG signature for this tag (if it is signed). 

1008 

1009 Args: 

1010 keyids: Optional iterable of trusted keyids for this tag. 

1011 If this tag is not signed by any key in keyids verification will 

1012 fail. If not specified, this function only verifies that the tag 

1013 has a valid signature. 

1014 

1015 Raises: 

1016 gpg.errors.BadSignatures: if GPG signature verification fails 

1017 gpg.errors.MissingSignatures: if tag was not signed by a key 

1018 specified in keyids 

1019 """ 

1020 if self._signature is None: 

1021 return 

1022 

1023 import gpg 

1024 

1025 with gpg.Context() as ctx: 

1026 data, result = ctx.verify( 

1027 self.raw_without_sig(), 

1028 signature=self._signature, 

1029 ) 

1030 if keyids: 

1031 keys = [ctx.get_key(key) for key in keyids] 

1032 for key in keys: 

1033 for subkey in keys: 

1034 for sig in result.signatures: 

1035 if subkey.can_sign and subkey.fpr == sig.fpr: 

1036 return 

1037 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1038 

1039 

1040class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])): 

1041 """Named tuple encapsulating a single tree entry.""" 

1042 

1043 def in_path(self, path: bytes) -> "TreeEntry": 

1044 """Return a copy of this entry with the given path prepended.""" 

1045 if not isinstance(self.path, bytes): 

1046 raise TypeError(f"Expected bytes for path, got {path!r}") 

1047 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

1048 

1049 

1050def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]: 

1051 """Parse a tree text. 

1052 

1053 Args: 

1054 text: Serialized text to parse 

1055 Returns: iterator of tuples of (name, mode, sha) 

1056 

1057 Raises: 

1058 ObjectFormatException: if the object was malformed in some way 

1059 """ 

1060 count = 0 

1061 length = len(text) 

1062 while count < length: 

1063 mode_end = text.index(b" ", count) 

1064 mode_text = text[count:mode_end] 

1065 if strict and mode_text.startswith(b"0"): 

1066 raise ObjectFormatException(f"Invalid mode {mode_text!r}") 

1067 try: 

1068 mode = int(mode_text, 8) 

1069 except ValueError as exc: 

1070 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc 

1071 name_end = text.index(b"\0", mode_end) 

1072 name = text[mode_end + 1 : name_end] 

1073 count = name_end + 21 

1074 sha = text[name_end + 1 : count] 

1075 if len(sha) != 20: 

1076 raise ObjectFormatException("Sha has invalid length") 

1077 hexsha = sha_to_hex(sha) 

1078 yield (name, mode, hexsha) 

1079 

1080 

1081def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]: 

1082 """Serialize the items in a tree to a text. 

1083 

1084 Args: 

1085 items: Sorted iterable over (name, mode, sha) tuples 

1086 Returns: Serialized tree text as chunks 

1087 """ 

1088 for name, mode, hexsha in items: 

1089 yield ( 

1090 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1091 ) 

1092 

1093 

1094def sorted_tree_items( 

1095 entries: dict[bytes, tuple[int, bytes]], name_order: bool 

1096) -> Iterator[TreeEntry]: 

1097 """Iterate over a tree entries dictionary. 

1098 

1099 Args: 

1100 name_order: If True, iterate entries in order of their name. If 

1101 False, iterate entries in tree order, that is, treat subtree entries as 

1102 having '/' appended. 

1103 entries: Dictionary mapping names to (mode, sha) tuples 

1104 Returns: Iterator over (name, mode, hexsha) 

1105 """ 

1106 if name_order: 

1107 key_func = key_entry_name_order 

1108 else: 

1109 key_func = key_entry 

1110 for name, entry in sorted(entries.items(), key=key_func): 

1111 mode, hexsha = entry 

1112 # Stricter type checks than normal to mirror checks in the Rust version. 

1113 mode = int(mode) 

1114 if not isinstance(hexsha, bytes): 

1115 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1116 yield TreeEntry(name, mode, hexsha) 

1117 

1118 

1119def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1120 """Sort key for tree entry. 

1121 

1122 Args: 

1123 entry: (name, value) tuple 

1124 """ 

1125 (name, (mode, _sha)) = entry 

1126 if stat.S_ISDIR(mode): 

1127 name += b"/" 

1128 return name 

1129 

1130 

1131def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes: 

1132 """Sort key for tree entry in name order.""" 

1133 return entry[0] 

1134 

1135 

1136def pretty_format_tree_entry( 

1137 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8" 

1138) -> str: 

1139 """Pretty format tree entry. 

1140 

1141 Args: 

1142 name: Name of the directory entry 

1143 mode: Mode of entry 

1144 hexsha: Hexsha of the referenced object 

1145 Returns: string describing the tree entry 

1146 """ 

1147 if mode & stat.S_IFDIR: 

1148 kind = "tree" 

1149 else: 

1150 kind = "blob" 

1151 return "{:04o} {} {}\t{}\n".format( 

1152 mode, 

1153 kind, 

1154 hexsha.decode("ascii"), 

1155 name.decode(encoding, "replace"), 

1156 ) 

1157 

1158 

1159class SubmoduleEncountered(Exception): 

1160 """A submodule was encountered while resolving a path.""" 

1161 

1162 def __init__(self, path: bytes, sha: ObjectID) -> None: 

1163 self.path = path 

1164 self.sha = sha 

1165 

1166 

1167class Tree(ShaFile): 

1168 """A Git tree object.""" 

1169 

1170 type_name = b"tree" 

1171 type_num = 2 

1172 

1173 __slots__ = "_entries" 

1174 

1175 def __init__(self) -> None: 

1176 super().__init__() 

1177 self._entries: dict[bytes, tuple[int, bytes]] = {} 

1178 

1179 @classmethod 

1180 def from_path(cls, filename: Union[str, bytes]) -> "Tree": 

1181 tree = ShaFile.from_path(filename) 

1182 if not isinstance(tree, cls): 

1183 raise NotTreeError(filename) 

1184 return tree 

1185 

1186 def __contains__(self, name: bytes) -> bool: 

1187 return name in self._entries 

1188 

1189 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]: 

1190 return self._entries[name] 

1191 

1192 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None: 

1193 """Set a tree entry by name. 

1194 

1195 Args: 

1196 name: The name of the entry, as a string. 

1197 value: A tuple of (mode, hexsha), where mode is the mode of the 

1198 entry as an integral type and hexsha is the hex SHA of the entry as 

1199 a string. 

1200 """ 

1201 mode, hexsha = value 

1202 self._entries[name] = (mode, hexsha) 

1203 self._needs_serialization = True 

1204 

1205 def __delitem__(self, name: bytes) -> None: 

1206 del self._entries[name] 

1207 self._needs_serialization = True 

1208 

1209 def __len__(self) -> int: 

1210 return len(self._entries) 

1211 

1212 def __iter__(self) -> Iterator[bytes]: 

1213 return iter(self._entries) 

1214 

1215 def add(self, name: bytes, mode: int, hexsha: bytes) -> None: 

1216 """Add an entry to the tree. 

1217 

1218 Args: 

1219 mode: The mode of the entry as an integral type. Not all 

1220 possible modes are supported by git; see check() for details. 

1221 name: The name of the entry, as a string. 

1222 hexsha: The hex SHA of the entry as a string. 

1223 """ 

1224 self._entries[name] = mode, hexsha 

1225 self._needs_serialization = True 

1226 

1227 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]: 

1228 """Iterate over entries. 

1229 

1230 Args: 

1231 name_order: If True, iterate in name order instead of tree 

1232 order. 

1233 Returns: Iterator over (name, mode, sha) tuples 

1234 """ 

1235 return sorted_tree_items(self._entries, name_order) 

1236 

1237 def items(self) -> list[TreeEntry]: 

1238 """Return the sorted entries in this tree. 

1239 

1240 Returns: List with (name, mode, sha) tuples 

1241 """ 

1242 return list(self.iteritems()) 

1243 

1244 def _deserialize(self, chunks: list[bytes]) -> None: 

1245 """Grab the entries in the tree.""" 

1246 try: 

1247 parsed_entries = parse_tree(b"".join(chunks)) 

1248 except ValueError as exc: 

1249 raise ObjectFormatException(exc) from exc 

1250 # TODO: list comprehension is for efficiency in the common (small) 

1251 # case; if memory efficiency in the large case is a concern, use a 

1252 # genexp. 

1253 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1254 

1255 def check(self) -> None: 

1256 """Check this object for internal consistency. 

1257 

1258 Raises: 

1259 ObjectFormatException: if the object is malformed in some way 

1260 """ 

1261 super().check() 

1262 assert self._chunked_text is not None 

1263 last = None 

1264 allowed_modes = ( 

1265 stat.S_IFREG | 0o755, 

1266 stat.S_IFREG | 0o644, 

1267 stat.S_IFLNK, 

1268 stat.S_IFDIR, 

1269 S_IFGITLINK, 

1270 # TODO: optionally exclude as in git fsck --strict 

1271 stat.S_IFREG | 0o664, 

1272 ) 

1273 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1274 check_hexsha(sha, f"invalid sha {sha!r}") 

1275 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1276 raise ObjectFormatException( 

1277 "invalid name {}".format(name.decode("utf-8", "replace")) 

1278 ) 

1279 

1280 if mode not in allowed_modes: 

1281 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1282 

1283 entry = (name, (mode, sha)) 

1284 if last: 

1285 if key_entry(last) > key_entry(entry): 

1286 raise ObjectFormatException("entries not sorted") 

1287 if name == last[0]: 

1288 raise ObjectFormatException(f"duplicate entry {name!r}") 

1289 last = entry 

1290 

1291 def _serialize(self) -> list[bytes]: 

1292 return list(serialize_tree(self.iteritems())) 

1293 

1294 def as_pretty_string(self) -> str: 

1295 text: list[str] = [] 

1296 for name, mode, hexsha in self.iteritems(): 

1297 text.append(pretty_format_tree_entry(name, mode, hexsha)) 

1298 return "".join(text) 

1299 

1300 def lookup_path( 

1301 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes 

1302 ) -> tuple[int, ObjectID]: 

1303 """Look up an object in a Git tree. 

1304 

1305 Args: 

1306 lookup_obj: Callback for retrieving object by SHA1 

1307 path: Path to lookup 

1308 Returns: A tuple of (mode, SHA) of the resulting path. 

1309 """ 

1310 parts = path.split(b"/") 

1311 sha = self.id 

1312 mode: Optional[int] = None 

1313 for i, p in enumerate(parts): 

1314 if not p: 

1315 continue 

1316 if mode is not None and S_ISGITLINK(mode): 

1317 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1318 obj = lookup_obj(sha) 

1319 if not isinstance(obj, Tree): 

1320 raise NotTreeError(sha) 

1321 mode, sha = obj[p] 

1322 if mode is None: 

1323 raise ValueError("No valid path found") 

1324 return mode, sha 

1325 

1326 

1327def parse_timezone(text: bytes) -> tuple[int, bool]: 

1328 """Parse a timezone text fragment (e.g. '+0100'). 

1329 

1330 Args: 

1331 text: Text to parse. 

1332 Returns: Tuple with timezone as seconds difference to UTC 

1333 and a boolean indicating whether this was a UTC timezone 

1334 prefixed with a negative sign (-0000). 

1335 """ 

1336 # cgit parses the first character as the sign, and the rest 

1337 # as an integer (using strtol), which could also be negative. 

1338 # We do the same for compatibility. See #697828. 

1339 if text[0] not in b"+-": 

1340 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1341 sign = text[:1] 

1342 offset = int(text[1:]) 

1343 if sign == b"-": 

1344 offset = -offset 

1345 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1346 signum = ((offset < 0) and -1) or 1 

1347 offset = abs(offset) 

1348 hours = int(offset / 100) 

1349 minutes = offset % 100 

1350 return ( 

1351 signum * (hours * 3600 + minutes * 60), 

1352 unnecessary_negative_timezone, 

1353 ) 

1354 

1355 

1356def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes: 

1357 """Format a timezone for Git serialization. 

1358 

1359 Args: 

1360 offset: Timezone offset as seconds difference to UTC 

1361 unnecessary_negative_timezone: Whether to use a minus sign for 

1362 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1363 """ 

1364 if offset % 60 != 0: 

1365 raise ValueError("Unable to handle non-minute offset.") 

1366 if offset < 0 or unnecessary_negative_timezone: 

1367 sign = "-" 

1368 offset = -offset 

1369 else: 

1370 sign = "+" 

1371 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031 

1372 

1373 

1374def parse_time_entry( 

1375 value: bytes, 

1376) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]: 

1377 """Parse event. 

1378 

1379 Args: 

1380 value: Bytes representing a git commit/tag line 

1381 Raises: 

1382 ObjectFormatException in case of parsing error (malformed 

1383 field date) 

1384 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1385 """ 

1386 try: 

1387 sep = value.rindex(b"> ") 

1388 except ValueError: 

1389 return (value, None, (None, False)) 

1390 try: 

1391 person = value[0 : sep + 1] 

1392 rest = value[sep + 2 :] 

1393 timetext, timezonetext = rest.rsplit(b" ", 1) 

1394 time = int(timetext) 

1395 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1396 except ValueError as exc: 

1397 raise ObjectFormatException(exc) from exc 

1398 return person, time, (timezone, timezone_neg_utc) 

1399 

1400 

1401def format_time_entry( 

1402 person: bytes, time: int, timezone_info: tuple[int, bool] 

1403) -> bytes: 

1404 """Format an event.""" 

1405 (timezone, timezone_neg_utc) = timezone_info 

1406 return b" ".join( 

1407 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1408 ) 

1409 

1410 

1411@replace_me(since="0.21.0", remove_in="0.24.0") 

1412def parse_commit( 

1413 chunks: Iterable[bytes], 

1414) -> tuple[ 

1415 Optional[bytes], 

1416 list[bytes], 

1417 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1418 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]], 

1419 Optional[bytes], 

1420 list[Tag], 

1421 Optional[bytes], 

1422 Optional[bytes], 

1423 list[tuple[bytes, bytes]], 

1424]: 

1425 """Parse a commit object from chunks. 

1426 

1427 Args: 

1428 chunks: Chunks to parse 

1429 Returns: Tuple of (tree, parents, author_info, commit_info, 

1430 encoding, mergetag, gpgsig, message, extra) 

1431 """ 

1432 parents = [] 

1433 extra = [] 

1434 tree = None 

1435 author_info: tuple[ 

1436 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1437 ] = (None, None, (None, None)) 

1438 commit_info: tuple[ 

1439 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1440 ] = (None, None, (None, None)) 

1441 encoding = None 

1442 mergetag = [] 

1443 message = None 

1444 gpgsig = None 

1445 

1446 for field, value in _parse_message(chunks): 

1447 # TODO(jelmer): Enforce ordering 

1448 if field == _TREE_HEADER: 

1449 tree = value 

1450 elif field == _PARENT_HEADER: 

1451 if value is None: 

1452 raise ObjectFormatException("missing parent value") 

1453 parents.append(value) 

1454 elif field == _AUTHOR_HEADER: 

1455 if value is None: 

1456 raise ObjectFormatException("missing author value") 

1457 author_info = parse_time_entry(value) 

1458 elif field == _COMMITTER_HEADER: 

1459 if value is None: 

1460 raise ObjectFormatException("missing committer value") 

1461 commit_info = parse_time_entry(value) 

1462 elif field == _ENCODING_HEADER: 

1463 encoding = value 

1464 elif field == _MERGETAG_HEADER: 

1465 if value is None: 

1466 raise ObjectFormatException("missing mergetag value") 

1467 tag = Tag.from_string(value + b"\n") 

1468 assert isinstance(tag, Tag) 

1469 mergetag.append(tag) 

1470 elif field == _GPGSIG_HEADER: 

1471 gpgsig = value 

1472 elif field is None: 

1473 message = value 

1474 else: 

1475 if value is None: 

1476 raise ObjectFormatException(f"missing value for field {field!r}") 

1477 extra.append((field, value)) 

1478 return ( 

1479 tree, 

1480 parents, 

1481 author_info, 

1482 commit_info, 

1483 encoding, 

1484 mergetag, 

1485 gpgsig, 

1486 message, 

1487 extra, 

1488 ) 

1489 

1490 

1491class Commit(ShaFile): 

1492 """A git commit object.""" 

1493 

1494 type_name = b"commit" 

1495 type_num = 1 

1496 

1497 __slots__ = ( 

1498 "_author", 

1499 "_author_time", 

1500 "_author_timezone", 

1501 "_author_timezone_neg_utc", 

1502 "_commit_time", 

1503 "_commit_timezone", 

1504 "_commit_timezone_neg_utc", 

1505 "_committer", 

1506 "_encoding", 

1507 "_extra", 

1508 "_gpgsig", 

1509 "_mergetag", 

1510 "_message", 

1511 "_parents", 

1512 "_tree", 

1513 ) 

1514 

1515 def __init__(self) -> None: 

1516 super().__init__() 

1517 self._parents: list[bytes] = [] 

1518 self._encoding: Optional[bytes] = None 

1519 self._mergetag: list[Tag] = [] 

1520 self._gpgsig: Optional[bytes] = None 

1521 self._extra: list[tuple[bytes, Optional[bytes]]] = [] 

1522 self._author_timezone_neg_utc: Optional[bool] = False 

1523 self._commit_timezone_neg_utc: Optional[bool] = False 

1524 

1525 @classmethod 

1526 def from_path(cls, path: Union[str, bytes]) -> "Commit": 

1527 commit = ShaFile.from_path(path) 

1528 if not isinstance(commit, cls): 

1529 raise NotCommitError(path) 

1530 return commit 

1531 

1532 def _deserialize(self, chunks: list[bytes]) -> None: 

1533 self._parents = [] 

1534 self._extra = [] 

1535 self._tree = None 

1536 author_info: tuple[ 

1537 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1538 ] = (None, None, (None, None)) 

1539 commit_info: tuple[ 

1540 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]] 

1541 ] = (None, None, (None, None)) 

1542 self._encoding = None 

1543 self._mergetag = [] 

1544 self._message = None 

1545 self._gpgsig = None 

1546 

1547 for field, value in _parse_message(chunks): 

1548 # TODO(jelmer): Enforce ordering 

1549 if field == _TREE_HEADER: 

1550 self._tree = value 

1551 elif field == _PARENT_HEADER: 

1552 assert value is not None 

1553 self._parents.append(value) 

1554 elif field == _AUTHOR_HEADER: 

1555 if value is None: 

1556 raise ObjectFormatException("missing author value") 

1557 author_info = parse_time_entry(value) 

1558 elif field == _COMMITTER_HEADER: 

1559 if value is None: 

1560 raise ObjectFormatException("missing committer value") 

1561 commit_info = parse_time_entry(value) 

1562 elif field == _ENCODING_HEADER: 

1563 self._encoding = value 

1564 elif field == _MERGETAG_HEADER: 

1565 assert value is not None 

1566 tag = Tag.from_string(value + b"\n") 

1567 assert isinstance(tag, Tag) 

1568 self._mergetag.append(tag) 

1569 elif field == _GPGSIG_HEADER: 

1570 self._gpgsig = value 

1571 elif field is None: 

1572 self._message = value 

1573 else: 

1574 self._extra.append((field, value)) 

1575 

1576 ( 

1577 self._author, 

1578 self._author_time, 

1579 (self._author_timezone, self._author_timezone_neg_utc), 

1580 ) = author_info 

1581 ( 

1582 self._committer, 

1583 self._commit_time, 

1584 (self._commit_timezone, self._commit_timezone_neg_utc), 

1585 ) = commit_info 

1586 

1587 def check(self) -> None: 

1588 """Check this object for internal consistency. 

1589 

1590 Raises: 

1591 ObjectFormatException: if the object is malformed in some way 

1592 """ 

1593 super().check() 

1594 assert self._chunked_text is not None 

1595 self._check_has_member("_tree", "missing tree") 

1596 self._check_has_member("_author", "missing author") 

1597 self._check_has_member("_committer", "missing committer") 

1598 self._check_has_member("_author_time", "missing author time") 

1599 self._check_has_member("_commit_time", "missing commit time") 

1600 

1601 for parent in self._parents: 

1602 check_hexsha(parent, "invalid parent sha") 

1603 assert self._tree is not None # checked by _check_has_member above 

1604 check_hexsha(self._tree, "invalid tree sha") 

1605 

1606 assert self._author is not None # checked by _check_has_member above 

1607 assert self._committer is not None # checked by _check_has_member above 

1608 check_identity(self._author, "invalid author") 

1609 check_identity(self._committer, "invalid committer") 

1610 

1611 assert self._author_time is not None # checked by _check_has_member above 

1612 assert self._commit_time is not None # checked by _check_has_member above 

1613 check_time(self._author_time) 

1614 check_time(self._commit_time) 

1615 

1616 last = None 

1617 for field, _ in _parse_message(self._chunked_text): 

1618 if field == _TREE_HEADER and last is not None: 

1619 raise ObjectFormatException("unexpected tree") 

1620 elif field == _PARENT_HEADER and last not in ( 

1621 _PARENT_HEADER, 

1622 _TREE_HEADER, 

1623 ): 

1624 raise ObjectFormatException("unexpected parent") 

1625 elif field == _AUTHOR_HEADER and last not in ( 

1626 _TREE_HEADER, 

1627 _PARENT_HEADER, 

1628 ): 

1629 raise ObjectFormatException("unexpected author") 

1630 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1631 raise ObjectFormatException("unexpected committer") 

1632 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1633 raise ObjectFormatException("unexpected encoding") 

1634 last = field 

1635 

1636 # TODO: optionally check for duplicate parents 

1637 

1638 def sign(self, keyid: Optional[str] = None) -> None: 

1639 import gpg 

1640 

1641 with gpg.Context(armor=True) as c: 

1642 if keyid is not None: 

1643 key = c.get_key(keyid) 

1644 with gpg.Context(armor=True, signers=[key]) as ctx: 

1645 self.gpgsig, unused_result = ctx.sign( 

1646 self.as_raw_string(), 

1647 mode=gpg.constants.sig.mode.DETACH, 

1648 ) 

1649 else: 

1650 self.gpgsig, unused_result = c.sign( 

1651 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1652 ) 

1653 

1654 def raw_without_sig(self) -> bytes: 

1655 """Return raw string serialization without the GPG/SSH signature. 

1656 

1657 self.gpgsig is a signature for the returned raw byte string serialization. 

1658 """ 

1659 tmp = self.copy() 

1660 assert isinstance(tmp, Commit) 

1661 tmp._gpgsig = None 

1662 tmp.gpgsig = None 

1663 return tmp.as_raw_string() 

1664 

1665 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

1666 """Verify GPG signature for this commit (if it is signed). 

1667 

1668 Args: 

1669 keyids: Optional iterable of trusted keyids for this commit. 

1670 If this commit is not signed by any key in keyids verification will 

1671 fail. If not specified, this function only verifies that the commit 

1672 has a valid signature. 

1673 

1674 Raises: 

1675 gpg.errors.BadSignatures: if GPG signature verification fails 

1676 gpg.errors.MissingSignatures: if commit was not signed by a key 

1677 specified in keyids 

1678 """ 

1679 if self._gpgsig is None: 

1680 return 

1681 

1682 import gpg 

1683 

1684 with gpg.Context() as ctx: 

1685 data, result = ctx.verify( 

1686 self.raw_without_sig(), 

1687 signature=self._gpgsig, 

1688 ) 

1689 if keyids: 

1690 keys = [ctx.get_key(key) for key in keyids] 

1691 for key in keys: 

1692 for subkey in keys: 

1693 for sig in result.signatures: 

1694 if subkey.can_sign and subkey.fpr == sig.fpr: 

1695 return 

1696 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1697 

1698 def _serialize(self) -> list[bytes]: 

1699 headers = [] 

1700 assert self._tree is not None 

1701 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1702 headers.append((_TREE_HEADER, tree_bytes)) 

1703 for p in self._parents: 

1704 headers.append((_PARENT_HEADER, p)) 

1705 assert self._author is not None 

1706 assert self._author_time is not None 

1707 assert self._author_timezone is not None 

1708 assert self._author_timezone_neg_utc is not None 

1709 headers.append( 

1710 ( 

1711 _AUTHOR_HEADER, 

1712 format_time_entry( 

1713 self._author, 

1714 self._author_time, 

1715 (self._author_timezone, self._author_timezone_neg_utc), 

1716 ), 

1717 ) 

1718 ) 

1719 assert self._committer is not None 

1720 assert self._commit_time is not None 

1721 assert self._commit_timezone is not None 

1722 assert self._commit_timezone_neg_utc is not None 

1723 headers.append( 

1724 ( 

1725 _COMMITTER_HEADER, 

1726 format_time_entry( 

1727 self._committer, 

1728 self._commit_time, 

1729 (self._commit_timezone, self._commit_timezone_neg_utc), 

1730 ), 

1731 ) 

1732 ) 

1733 if self.encoding: 

1734 headers.append((_ENCODING_HEADER, self.encoding)) 

1735 for mergetag in self.mergetag: 

1736 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

1737 headers.extend( 

1738 (field, value) for field, value in self._extra if value is not None 

1739 ) 

1740 if self.gpgsig: 

1741 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

1742 return list(_format_message(headers, self._message)) 

1743 

1744 tree = serializable_property("tree", "Tree that is the state of this commit") 

1745 

1746 def _get_parents(self) -> list[bytes]: 

1747 """Return a list of parents of this commit.""" 

1748 return self._parents 

1749 

1750 def _set_parents(self, value: list[bytes]) -> None: 

1751 """Set a list of parents of this commit.""" 

1752 self._needs_serialization = True 

1753 self._parents = value 

1754 

1755 parents = property( 

1756 _get_parents, 

1757 _set_parents, 

1758 doc="Parents of this commit, by their SHA1.", 

1759 ) 

1760 

1761 @replace_me(since="0.21.0", remove_in="0.24.0") 

1762 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]: 

1763 """Return extra settings of this commit.""" 

1764 return self._extra 

1765 

1766 extra = property( 

1767 _get_extra, 

1768 doc="Extra header fields not understood (presumably added in a " 

1769 "newer version of git). Kept verbatim so the object can " 

1770 "be correctly reserialized. For private commit metadata, use " 

1771 "pseudo-headers in Commit.message, rather than this field.", 

1772 ) 

1773 

1774 author = serializable_property("author", "The name of the author of the commit") 

1775 

1776 committer = serializable_property( 

1777 "committer", "The name of the committer of the commit" 

1778 ) 

1779 

1780 message = serializable_property("message", "The commit message") 

1781 

1782 commit_time = serializable_property( 

1783 "commit_time", 

1784 "The timestamp of the commit. As the number of seconds since the epoch.", 

1785 ) 

1786 

1787 commit_timezone = serializable_property( 

1788 "commit_timezone", "The zone the commit time is in" 

1789 ) 

1790 

1791 author_time = serializable_property( 

1792 "author_time", 

1793 "The timestamp the commit was written. As the number of " 

1794 "seconds since the epoch.", 

1795 ) 

1796 

1797 author_timezone = serializable_property( 

1798 "author_timezone", "Returns the zone the author time is in." 

1799 ) 

1800 

1801 encoding = serializable_property("encoding", "Encoding of the commit message.") 

1802 

1803 mergetag = serializable_property("mergetag", "Associated signed tag.") 

1804 

1805 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

1806 

1807 

1808OBJECT_CLASSES = ( 

1809 Commit, 

1810 Tree, 

1811 Blob, 

1812 Tag, 

1813) 

1814 

1815_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {} 

1816 

1817for cls in OBJECT_CLASSES: 

1818 _TYPE_MAP[cls.type_name] = cls 

1819 _TYPE_MAP[cls.type_num] = cls 

1820 

1821 

1822# Hold on to the pure-python implementations for testing 

1823_parse_tree_py = parse_tree 

1824_sorted_tree_items_py = sorted_tree_items 

1825try: 

1826 # Try to import Rust versions 

1827 from dulwich._objects import ( 

1828 parse_tree as _parse_tree_rs, 

1829 ) 

1830 from dulwich._objects import ( 

1831 sorted_tree_items as _sorted_tree_items_rs, 

1832 ) 

1833except ImportError: 

1834 pass 

1835else: 

1836 parse_tree = _parse_tree_rs 

1837 sorted_tree_items = _sorted_tree_items_rs