Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dulwich/objects.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

861 statements  

1# objects.py -- Access to base git objects 

2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net> 

3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk> 

4# 

5# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU 

6# General Public License as public by the Free Software Foundation; version 2.0 

7# or (at your option) any later version. You can redistribute it and/or 

8# modify it under the terms of either of these two licenses. 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15# 

16# You should have received a copy of the licenses; if not, see 

17# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License 

18# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache 

19# License, Version 2.0. 

20# 

21 

22"""Access to base git objects.""" 

23 

24import binascii 

25import os 

26import posixpath 

27import stat 

28import warnings 

29import zlib 

30from collections import namedtuple 

31from hashlib import sha1 

32from io import BytesIO 

33from typing import ( 

34 TYPE_CHECKING, 

35 BinaryIO, 

36 Dict, 

37 Iterable, 

38 Iterator, 

39 List, 

40 Optional, 

41 Tuple, 

42 Type, 

43 Union, 

44) 

45 

46from .errors import ( 

47 ChecksumMismatch, 

48 FileFormatException, 

49 NotBlobError, 

50 NotCommitError, 

51 NotTagError, 

52 NotTreeError, 

53 ObjectFormatException, 

54) 

55from .file import GitFile 

56 

57if TYPE_CHECKING: 

58 from _hashlib import HASH 

59 

60ZERO_SHA = b"0" * 40 

61 

62# Header fields for commits 

63_TREE_HEADER = b"tree" 

64_PARENT_HEADER = b"parent" 

65_AUTHOR_HEADER = b"author" 

66_COMMITTER_HEADER = b"committer" 

67_ENCODING_HEADER = b"encoding" 

68_MERGETAG_HEADER = b"mergetag" 

69_GPGSIG_HEADER = b"gpgsig" 

70 

71# Header fields for objects 

72_OBJECT_HEADER = b"object" 

73_TYPE_HEADER = b"type" 

74_TAG_HEADER = b"tag" 

75_TAGGER_HEADER = b"tagger" 

76 

77 

78S_IFGITLINK = 0o160000 

79 

80 

81MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max 

82 

83BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" 

84 

85 

86ObjectID = bytes 

87 

88 

89class EmptyFileException(FileFormatException): 

90 """An unexpectedly empty file was encountered.""" 

91 

92 

93def S_ISGITLINK(m): 

94 """Check if a mode indicates a submodule. 

95 

96 Args: 

97 m: Mode to check 

98 Returns: a ``boolean`` 

99 """ 

100 return stat.S_IFMT(m) == S_IFGITLINK 

101 

102 

103def _decompress(string): 

104 dcomp = zlib.decompressobj() 

105 dcomped = dcomp.decompress(string) 

106 dcomped += dcomp.flush() 

107 return dcomped 

108 

109 

110def sha_to_hex(sha): 

111 """Takes a string and returns the hex of the sha within.""" 

112 hexsha = binascii.hexlify(sha) 

113 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}" 

114 return hexsha 

115 

116 

117def hex_to_sha(hex): 

118 """Takes a hex sha and returns a binary sha.""" 

119 assert len(hex) == 40, f"Incorrect length of hexsha: {hex}" 

120 try: 

121 return binascii.unhexlify(hex) 

122 except TypeError as exc: 

123 if not isinstance(hex, bytes): 

124 raise 

125 raise ValueError(exc.args[0]) from exc 

126 

127 

128def valid_hexsha(hex): 

129 if len(hex) != 40: 

130 return False 

131 try: 

132 binascii.unhexlify(hex) 

133 except (TypeError, binascii.Error): 

134 return False 

135 else: 

136 return True 

137 

138 

139def hex_to_filename(path, hex): 

140 """Takes a hex sha and returns its filename relative to the given path.""" 

141 # os.path.join accepts bytes or unicode, but all args must be of the same 

142 # type. Make sure that hex which is expected to be bytes, is the same type 

143 # as path. 

144 if type(path) is not type(hex) and getattr(path, "encode", None) is not None: 

145 hex = hex.decode("ascii") 

146 dir = hex[:2] 

147 file = hex[2:] 

148 # Check from object dir 

149 return os.path.join(path, dir, file) 

150 

151 

152def filename_to_hex(filename): 

153 """Takes an object filename and returns its corresponding hex sha.""" 

154 # grab the last (up to) two path components 

155 names = filename.rsplit(os.path.sep, 2)[-2:] 

156 errmsg = f"Invalid object filename: {filename}" 

157 assert len(names) == 2, errmsg 

158 base, rest = names 

159 assert len(base) == 2 and len(rest) == 38, errmsg 

160 hex = (base + rest).encode("ascii") 

161 hex_to_sha(hex) 

162 return hex 

163 

164 

165def object_header(num_type: int, length: int) -> bytes: 

166 """Return an object header for the given numeric type and text length.""" 

167 cls = object_class(num_type) 

168 if cls is None: 

169 raise AssertionError("unsupported class type num: %d" % num_type) 

170 return cls.type_name + b" " + str(length).encode("ascii") + b"\0" 

171 

172 

173def serializable_property(name: str, docstring: Optional[str] = None): 

174 """A property that helps tracking whether serialization is necessary.""" 

175 

176 def set(obj, value): 

177 setattr(obj, "_" + name, value) 

178 obj._needs_serialization = True 

179 

180 def get(obj): 

181 return getattr(obj, "_" + name) 

182 

183 return property(get, set, doc=docstring) 

184 

185 

186def object_class(type: Union[bytes, int]) -> Optional[Type["ShaFile"]]: 

187 """Get the object class corresponding to the given type. 

188 

189 Args: 

190 type: Either a type name string or a numeric type. 

191 Returns: The ShaFile subclass corresponding to the given type, or None if 

192 type is not a valid type name/number. 

193 """ 

194 return _TYPE_MAP.get(type, None) 

195 

196 

197def check_hexsha(hex, error_msg): 

198 """Check if a string is a valid hex sha string. 

199 

200 Args: 

201 hex: Hex string to check 

202 error_msg: Error message to use in exception 

203 Raises: 

204 ObjectFormatException: Raised when the string is not valid 

205 """ 

206 if not valid_hexsha(hex): 

207 raise ObjectFormatException(f"{error_msg} {hex}") 

208 

209 

210def check_identity(identity: bytes, error_msg: str) -> None: 

211 """Check if the specified identity is valid. 

212 

213 This will raise an exception if the identity is not valid. 

214 

215 Args: 

216 identity: Identity string 

217 error_msg: Error message to use in exception 

218 """ 

219 email_start = identity.find(b"<") 

220 email_end = identity.find(b">") 

221 if not all( 

222 [ 

223 email_start >= 1, 

224 identity[email_start - 1] == b" "[0], 

225 identity.find(b"<", email_start + 1) == -1, 

226 email_end == len(identity) - 1, 

227 b"\0" not in identity, 

228 b"\n" not in identity, 

229 ] 

230 ): 

231 raise ObjectFormatException(error_msg) 

232 

233 

234def check_time(time_seconds): 

235 """Check if the specified time is not prone to overflow error. 

236 

237 This will raise an exception if the time is not valid. 

238 

239 Args: 

240 time_seconds: time in seconds 

241 

242 """ 

243 # Prevent overflow error 

244 if time_seconds > MAX_TIME: 

245 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}") 

246 

247 

248def git_line(*items): 

249 """Formats items into a space separated line.""" 

250 return b" ".join(items) + b"\n" 

251 

252 

253class FixedSha: 

254 """SHA object that behaves like hashlib's but is given a fixed value.""" 

255 

256 __slots__ = ("_hexsha", "_sha") 

257 

258 def __init__(self, hexsha) -> None: 

259 if getattr(hexsha, "encode", None) is not None: 

260 hexsha = hexsha.encode("ascii") 

261 if not isinstance(hexsha, bytes): 

262 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}") 

263 self._hexsha = hexsha 

264 self._sha = hex_to_sha(hexsha) 

265 

266 def digest(self) -> bytes: 

267 """Return the raw SHA digest.""" 

268 return self._sha 

269 

270 def hexdigest(self) -> str: 

271 """Return the hex SHA digest.""" 

272 return self._hexsha.decode("ascii") 

273 

274 

275class ShaFile: 

276 """A git SHA file.""" 

277 

278 __slots__ = ("_chunked_text", "_sha", "_needs_serialization") 

279 

280 _needs_serialization: bool 

281 type_name: bytes 

282 type_num: int 

283 _chunked_text: Optional[List[bytes]] 

284 _sha: Union[FixedSha, None, "HASH"] 

285 

286 @staticmethod 

287 def _parse_legacy_object_header(magic, f: BinaryIO) -> "ShaFile": 

288 """Parse a legacy object, creating it but not reading the file.""" 

289 bufsize = 1024 

290 decomp = zlib.decompressobj() 

291 header = decomp.decompress(magic) 

292 start = 0 

293 end = -1 

294 while end < 0: 

295 extra = f.read(bufsize) 

296 header += decomp.decompress(extra) 

297 magic += extra 

298 end = header.find(b"\0", start) 

299 start = len(header) 

300 header = header[:end] 

301 type_name, size = header.split(b" ", 1) 

302 try: 

303 int(size) # sanity check 

304 except ValueError as exc: 

305 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc 

306 obj_class = object_class(type_name) 

307 if not obj_class: 

308 raise ObjectFormatException( 

309 "Not a known type: {}".format(type_name.decode("ascii")) 

310 ) 

311 return obj_class() 

312 

313 def _parse_legacy_object(self, map) -> None: 

314 """Parse a legacy object, setting the raw string.""" 

315 text = _decompress(map) 

316 header_end = text.find(b"\0") 

317 if header_end < 0: 

318 raise ObjectFormatException("Invalid object header, no \\0") 

319 self.set_raw_string(text[header_end + 1 :]) 

320 

321 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]: 

322 """Return chunks representing the object in the experimental format. 

323 

324 Returns: List of strings 

325 """ 

326 compobj = zlib.compressobj(compression_level) 

327 yield compobj.compress(self._header()) 

328 for chunk in self.as_raw_chunks(): 

329 yield compobj.compress(chunk) 

330 yield compobj.flush() 

331 

332 def as_legacy_object(self, compression_level: int = -1) -> bytes: 

333 """Return string representing the object in the experimental format.""" 

334 return b"".join( 

335 self.as_legacy_object_chunks(compression_level=compression_level) 

336 ) 

337 

338 def as_raw_chunks(self) -> List[bytes]: 

339 """Return chunks with serialization of the object. 

340 

341 Returns: List of strings, not necessarily one per line 

342 """ 

343 if self._needs_serialization: 

344 self._sha = None 

345 self._chunked_text = self._serialize() 

346 self._needs_serialization = False 

347 return self._chunked_text # type: ignore 

348 

349 def as_raw_string(self) -> bytes: 

350 """Return raw string with serialization of the object. 

351 

352 Returns: String object 

353 """ 

354 return b"".join(self.as_raw_chunks()) 

355 

356 def __bytes__(self) -> bytes: 

357 """Return raw string serialization of this object.""" 

358 return self.as_raw_string() 

359 

360 def __hash__(self): 

361 """Return unique hash for this object.""" 

362 return hash(self.id) 

363 

364 def as_pretty_string(self) -> str: 

365 """Return a string representing this object, fit for display.""" 

366 return self.as_raw_string().decode("utf-8", "replace") 

367 

368 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None: 

369 """Set the contents of this object from a serialized string.""" 

370 if not isinstance(text, bytes): 

371 raise TypeError(f"Expected bytes for text, got {text!r}") 

372 self.set_raw_chunks([text], sha) 

373 

374 def set_raw_chunks( 

375 self, chunks: List[bytes], sha: Optional[ObjectID] = None 

376 ) -> None: 

377 """Set the contents of this object from a list of chunks.""" 

378 self._chunked_text = chunks 

379 self._deserialize(chunks) 

380 if sha is None: 

381 self._sha = None 

382 else: 

383 self._sha = FixedSha(sha) # type: ignore 

384 self._needs_serialization = False 

385 

386 @staticmethod 

387 def _parse_object_header(magic, f): 

388 """Parse a new style object, creating it but not reading the file.""" 

389 num_type = (ord(magic[0:1]) >> 4) & 7 

390 obj_class = object_class(num_type) 

391 if not obj_class: 

392 raise ObjectFormatException("Not a known type %d" % num_type) 

393 return obj_class() 

394 

395 def _parse_object(self, map) -> None: 

396 """Parse a new style object, setting self._text.""" 

397 # skip type and size; type must have already been determined, and 

398 # we trust zlib to fail if it's otherwise corrupted 

399 byte = ord(map[0:1]) 

400 used = 1 

401 while (byte & 0x80) != 0: 

402 byte = ord(map[used : used + 1]) 

403 used += 1 

404 raw = map[used:] 

405 self.set_raw_string(_decompress(raw)) 

406 

407 @classmethod 

408 def _is_legacy_object(cls, magic: bytes) -> bool: 

409 b0 = ord(magic[0:1]) 

410 b1 = ord(magic[1:2]) 

411 word = (b0 << 8) + b1 

412 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 

413 

414 @classmethod 

415 def _parse_file(cls, f): 

416 map = f.read() 

417 if not map: 

418 raise EmptyFileException("Corrupted empty file detected") 

419 

420 if cls._is_legacy_object(map): 

421 obj = cls._parse_legacy_object_header(map, f) 

422 obj._parse_legacy_object(map) 

423 else: 

424 obj = cls._parse_object_header(map, f) 

425 obj._parse_object(map) 

426 return obj 

427 

428 def __init__(self) -> None: 

429 """Don't call this directly.""" 

430 self._sha = None 

431 self._chunked_text = [] 

432 self._needs_serialization = True 

433 

434 def _deserialize(self, chunks: List[bytes]) -> None: 

435 raise NotImplementedError(self._deserialize) 

436 

437 def _serialize(self) -> List[bytes]: 

438 raise NotImplementedError(self._serialize) 

439 

440 @classmethod 

441 def from_path(cls, path): 

442 """Open a SHA file from disk.""" 

443 with GitFile(path, "rb") as f: 

444 return cls.from_file(f) 

445 

446 @classmethod 

447 def from_file(cls, f): 

448 """Get the contents of a SHA file on disk.""" 

449 try: 

450 obj = cls._parse_file(f) 

451 obj._sha = None 

452 return obj 

453 except (IndexError, ValueError) as exc: 

454 raise ObjectFormatException("invalid object header") from exc 

455 

456 @staticmethod 

457 def from_raw_string(type_num, string, sha=None): 

458 """Creates an object of the indicated type from the raw string given. 

459 

460 Args: 

461 type_num: The numeric type of the object. 

462 string: The raw uncompressed contents. 

463 sha: Optional known sha for the object 

464 """ 

465 cls = object_class(type_num) 

466 if cls is None: 

467 raise AssertionError("unsupported class type num: %d" % type_num) 

468 obj = cls() 

469 obj.set_raw_string(string, sha) 

470 return obj 

471 

472 @staticmethod 

473 def from_raw_chunks( 

474 type_num: int, chunks: List[bytes], sha: Optional[ObjectID] = None 

475 ): 

476 """Creates an object of the indicated type from the raw chunks given. 

477 

478 Args: 

479 type_num: The numeric type of the object. 

480 chunks: An iterable of the raw uncompressed contents. 

481 sha: Optional known sha for the object 

482 """ 

483 cls = object_class(type_num) 

484 if cls is None: 

485 raise AssertionError("unsupported class type num: %d" % type_num) 

486 obj = cls() 

487 obj.set_raw_chunks(chunks, sha) 

488 return obj 

489 

490 @classmethod 

491 def from_string(cls, string): 

492 """Create a ShaFile from a string.""" 

493 obj = cls() 

494 obj.set_raw_string(string) 

495 return obj 

496 

497 def _check_has_member(self, member, error_msg): 

498 """Check that the object has a given member variable. 

499 

500 Args: 

501 member: the member variable to check for 

502 error_msg: the message for an error if the member is missing 

503 Raises: 

504 ObjectFormatException: with the given error_msg if member is 

505 missing or is None 

506 """ 

507 if getattr(self, member, None) is None: 

508 raise ObjectFormatException(error_msg) 

509 

510 def check(self) -> None: 

511 """Check this object for internal consistency. 

512 

513 Raises: 

514 ObjectFormatException: if the object is malformed in some way 

515 ChecksumMismatch: if the object was created with a SHA that does 

516 not match its contents 

517 """ 

518 # TODO: if we find that error-checking during object parsing is a 

519 # performance bottleneck, those checks should be moved to the class's 

520 # check() method during optimization so we can still check the object 

521 # when necessary. 

522 old_sha = self.id 

523 try: 

524 self._deserialize(self.as_raw_chunks()) 

525 self._sha = None 

526 new_sha = self.id 

527 except Exception as exc: 

528 raise ObjectFormatException(exc) from exc 

529 if old_sha != new_sha: 

530 raise ChecksumMismatch(new_sha, old_sha) 

531 

532 def _header(self): 

533 return object_header(self.type_num, self.raw_length()) 

534 

535 def raw_length(self) -> int: 

536 """Returns the length of the raw string of this object.""" 

537 return sum(map(len, self.as_raw_chunks())) 

538 

539 def sha(self): 

540 """The SHA1 object that is the name of this object.""" 

541 if self._sha is None or self._needs_serialization: 

542 # this is a local because as_raw_chunks() overwrites self._sha 

543 new_sha = sha1() 

544 new_sha.update(self._header()) 

545 for chunk in self.as_raw_chunks(): 

546 new_sha.update(chunk) 

547 self._sha = new_sha 

548 return self._sha 

549 

550 def copy(self): 

551 """Create a new copy of this SHA1 object from its raw string.""" 

552 obj_class = object_class(self.type_num) 

553 if obj_class is None: 

554 raise AssertionError("invalid type num %d" % self.type_num) 

555 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id) 

556 

557 @property 

558 def id(self): 

559 """The hex SHA of this object.""" 

560 return self.sha().hexdigest().encode("ascii") 

561 

562 def __repr__(self) -> str: 

563 return f"<{self.__class__.__name__} {self.id}>" 

564 

565 def __ne__(self, other): 

566 """Check whether this object does not match the other.""" 

567 return not isinstance(other, ShaFile) or self.id != other.id 

568 

569 def __eq__(self, other): 

570 """Return True if the SHAs of the two objects match.""" 

571 return isinstance(other, ShaFile) and self.id == other.id 

572 

573 def __lt__(self, other): 

574 """Return whether SHA of this object is less than the other.""" 

575 if not isinstance(other, ShaFile): 

576 raise TypeError 

577 return self.id < other.id 

578 

579 def __le__(self, other): 

580 """Check whether SHA of this object is less than or equal to the other.""" 

581 if not isinstance(other, ShaFile): 

582 raise TypeError 

583 return self.id <= other.id 

584 

585 

586class Blob(ShaFile): 

587 """A Git Blob object.""" 

588 

589 __slots__ = () 

590 

591 type_name = b"blob" 

592 type_num = 3 

593 

594 _chunked_text: List[bytes] 

595 

596 def __init__(self) -> None: 

597 super().__init__() 

598 self._chunked_text = [] 

599 self._needs_serialization = False 

600 

601 def _get_data(self): 

602 return self.as_raw_string() 

603 

604 def _set_data(self, data): 

605 self.set_raw_string(data) 

606 

607 data = property( 

608 _get_data, _set_data, doc="The text contained within the blob object." 

609 ) 

610 

611 def _get_chunked(self): 

612 return self._chunked_text 

613 

614 def _set_chunked(self, chunks: List[bytes]): 

615 self._chunked_text = chunks 

616 

617 def _serialize(self): 

618 return self._chunked_text 

619 

620 def _deserialize(self, chunks): 

621 self._chunked_text = chunks 

622 

623 chunked = property( 

624 _get_chunked, 

625 _set_chunked, 

626 doc="The text in the blob object, as chunks (not necessarily lines)", 

627 ) 

628 

629 @classmethod 

630 def from_path(cls, path): 

631 blob = ShaFile.from_path(path) 

632 if not isinstance(blob, cls): 

633 raise NotBlobError(path) 

634 return blob 

635 

636 def check(self): 

637 """Check this object for internal consistency. 

638 

639 Raises: 

640 ObjectFormatException: if the object is malformed in some way 

641 """ 

642 super().check() 

643 

644 def splitlines(self) -> List[bytes]: 

645 """Return list of lines in this blob. 

646 

647 This preserves the original line endings. 

648 """ 

649 chunks = self.chunked 

650 if not chunks: 

651 return [] 

652 if len(chunks) == 1: 

653 return chunks[0].splitlines(True) 

654 remaining = None 

655 ret = [] 

656 for chunk in chunks: 

657 lines = chunk.splitlines(True) 

658 if len(lines) > 1: 

659 ret.append((remaining or b"") + lines[0]) 

660 ret.extend(lines[1:-1]) 

661 remaining = lines[-1] 

662 elif len(lines) == 1: 

663 if remaining is None: 

664 remaining = lines.pop() 

665 else: 

666 remaining += lines.pop() 

667 if remaining is not None: 

668 ret.append(remaining) 

669 return ret 

670 

671 

672def _parse_message( 

673 chunks: Iterable[bytes], 

674) -> Iterator[Union[Tuple[None, None], Tuple[Optional[bytes], bytes]]]: 

675 """Parse a message with a list of fields and a body. 

676 

677 Args: 

678 chunks: the raw chunks of the tag or commit object. 

679 Returns: iterator of tuples of (field, value), one per header line, in the 

680 order read from the text, possibly including duplicates. Includes a 

681 field named None for the freeform tag/commit text. 

682 """ 

683 f = BytesIO(b"".join(chunks)) 

684 k = None 

685 v = b"" 

686 eof = False 

687 

688 def _strip_last_newline(value): 

689 """Strip the last newline from value.""" 

690 if value and value.endswith(b"\n"): 

691 return value[:-1] 

692 return value 

693 

694 # Parse the headers 

695 # 

696 # Headers can contain newlines. The next line is indented with a space. 

697 # We store the latest key as 'k', and the accumulated value as 'v'. 

698 for line in f: 

699 if line.startswith(b" "): 

700 # Indented continuation of the previous line 

701 v += line[1:] 

702 else: 

703 if k is not None: 

704 # We parsed a new header, return its value 

705 yield (k, _strip_last_newline(v)) 

706 if line == b"\n": 

707 # Empty line indicates end of headers 

708 break 

709 (k, v) = line.split(b" ", 1) 

710 

711 else: 

712 # We reached end of file before the headers ended. We still need to 

713 # return the previous header, then we need to return a None field for 

714 # the text. 

715 eof = True 

716 if k is not None: 

717 yield (k, _strip_last_newline(v)) 

718 yield (None, None) 

719 

720 if not eof: 

721 # We didn't reach the end of file while parsing headers. We can return 

722 # the rest of the file as a message. 

723 yield (None, f.read()) 

724 

725 f.close() 

726 

727 

728def _format_message(headers, body): 

729 for field, value in headers: 

730 lines = value.split(b"\n") 

731 yield git_line(field, lines[0]) 

732 for line in lines[1:]: 

733 yield b" " + line + b"\n" 

734 if body: 

735 yield b"\n" # There must be a new line after the headers 

736 yield body 

737 

738 

739class Tag(ShaFile): 

740 """A Git Tag object.""" 

741 

742 type_name = b"tag" 

743 type_num = 4 

744 

745 __slots__ = ( 

746 "_tag_timezone_neg_utc", 

747 "_name", 

748 "_object_sha", 

749 "_object_class", 

750 "_tag_time", 

751 "_tag_timezone", 

752 "_tagger", 

753 "_message", 

754 "_signature", 

755 ) 

756 

757 _tagger: Optional[bytes] 

758 

759 def __init__(self) -> None: 

760 super().__init__() 

761 self._tagger = None 

762 self._tag_time = None 

763 self._tag_timezone = None 

764 self._tag_timezone_neg_utc = False 

765 self._signature = None 

766 

767 @classmethod 

768 def from_path(cls, filename): 

769 tag = ShaFile.from_path(filename) 

770 if not isinstance(tag, cls): 

771 raise NotTagError(filename) 

772 return tag 

773 

774 def check(self): 

775 """Check this object for internal consistency. 

776 

777 Raises: 

778 ObjectFormatException: if the object is malformed in some way 

779 """ 

780 super().check() 

781 assert self._chunked_text is not None 

782 self._check_has_member("_object_sha", "missing object sha") 

783 self._check_has_member("_object_class", "missing object type") 

784 self._check_has_member("_name", "missing tag name") 

785 

786 if not self._name: 

787 raise ObjectFormatException("empty tag name") 

788 

789 check_hexsha(self._object_sha, "invalid object sha") 

790 

791 if self._tagger is not None: 

792 check_identity(self._tagger, "invalid tagger") 

793 

794 self._check_has_member("_tag_time", "missing tag time") 

795 check_time(self._tag_time) 

796 

797 last = None 

798 for field, _ in _parse_message(self._chunked_text): 

799 if field == _OBJECT_HEADER and last is not None: 

800 raise ObjectFormatException("unexpected object") 

801 elif field == _TYPE_HEADER and last != _OBJECT_HEADER: 

802 raise ObjectFormatException("unexpected type") 

803 elif field == _TAG_HEADER and last != _TYPE_HEADER: 

804 raise ObjectFormatException("unexpected tag name") 

805 elif field == _TAGGER_HEADER and last != _TAG_HEADER: 

806 raise ObjectFormatException("unexpected tagger") 

807 last = field 

808 

809 def _serialize(self): 

810 headers = [] 

811 headers.append((_OBJECT_HEADER, self._object_sha)) 

812 headers.append((_TYPE_HEADER, self._object_class.type_name)) 

813 headers.append((_TAG_HEADER, self._name)) 

814 if self._tagger: 

815 if self._tag_time is None: 

816 headers.append((_TAGGER_HEADER, self._tagger)) 

817 else: 

818 headers.append( 

819 ( 

820 _TAGGER_HEADER, 

821 format_time_entry( 

822 self._tagger, 

823 self._tag_time, 

824 (self._tag_timezone, self._tag_timezone_neg_utc), 

825 ), 

826 ) 

827 ) 

828 

829 if self.message is None and self._signature is None: 

830 body = None 

831 else: 

832 body = (self.message or b"") + (self._signature or b"") 

833 return list(_format_message(headers, body)) 

834 

835 def _deserialize(self, chunks): 

836 """Grab the metadata attached to the tag.""" 

837 self._tagger = None 

838 self._tag_time = None 

839 self._tag_timezone = None 

840 self._tag_timezone_neg_utc = False 

841 for field, value in _parse_message(chunks): 

842 if field == _OBJECT_HEADER: 

843 self._object_sha = value 

844 elif field == _TYPE_HEADER: 

845 assert isinstance(value, bytes) 

846 obj_class = object_class(value) 

847 if not obj_class: 

848 raise ObjectFormatException(f"Not a known type: {value!r}") 

849 self._object_class = obj_class 

850 elif field == _TAG_HEADER: 

851 self._name = value 

852 elif field == _TAGGER_HEADER: 

853 ( 

854 self._tagger, 

855 self._tag_time, 

856 (self._tag_timezone, self._tag_timezone_neg_utc), 

857 ) = parse_time_entry(value) 

858 elif field is None: 

859 if value is None: 

860 self._message = None 

861 self._signature = None 

862 else: 

863 try: 

864 sig_idx = value.index(BEGIN_PGP_SIGNATURE) 

865 except ValueError: 

866 self._message = value 

867 self._signature = None 

868 else: 

869 self._message = value[:sig_idx] 

870 self._signature = value[sig_idx:] 

871 else: 

872 raise ObjectFormatException(f"Unknown field {field}") 

873 

874 def _get_object(self): 

875 """Get the object pointed to by this tag. 

876 

877 Returns: tuple of (object class, sha). 

878 """ 

879 return (self._object_class, self._object_sha) 

880 

881 def _set_object(self, value): 

882 (self._object_class, self._object_sha) = value 

883 self._needs_serialization = True 

884 

885 object = property(_get_object, _set_object) 

886 

887 name = serializable_property("name", "The name of this tag") 

888 tagger = serializable_property( 

889 "tagger", "Returns the name of the person who created this tag" 

890 ) 

891 tag_time = serializable_property( 

892 "tag_time", 

893 "The creation timestamp of the tag. As the number of seconds " 

894 "since the epoch", 

895 ) 

896 tag_timezone = serializable_property( 

897 "tag_timezone", "The timezone that tag_time is in." 

898 ) 

899 message = serializable_property("message", "the message attached to this tag") 

900 

901 signature = serializable_property("signature", "Optional detached GPG signature") 

902 

903 def sign(self, keyid: Optional[str] = None): 

904 import gpg 

905 

906 with gpg.Context(armor=True) as c: 

907 if keyid is not None: 

908 key = c.get_key(keyid) 

909 with gpg.Context(armor=True, signers=[key]) as ctx: 

910 self.signature, unused_result = ctx.sign( 

911 self.as_raw_string(), 

912 mode=gpg.constants.sig.mode.DETACH, 

913 ) 

914 else: 

915 self.signature, unused_result = c.sign( 

916 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

917 ) 

918 

919 def verify(self, keyids: Optional[Iterable[str]] = None) -> None: 

920 """Verify GPG signature for this tag (if it is signed). 

921 

922 Args: 

923 keyids: Optional iterable of trusted keyids for this tag. 

924 If this tag is not signed by any key in keyids verification will 

925 fail. If not specified, this function only verifies that the tag 

926 has a valid signature. 

927 

928 Raises: 

929 gpg.errors.BadSignatures: if GPG signature verification fails 

930 gpg.errors.MissingSignatures: if tag was not signed by a key 

931 specified in keyids 

932 """ 

933 if self._signature is None: 

934 return 

935 

936 import gpg 

937 

938 with gpg.Context() as ctx: 

939 data, result = ctx.verify( 

940 self.as_raw_string()[: -len(self._signature)], 

941 signature=self._signature, 

942 ) 

943 if keyids: 

944 keys = [ctx.get_key(key) for key in keyids] 

945 for key in keys: 

946 for subkey in keys: 

947 for sig in result.signatures: 

948 if subkey.can_sign and subkey.fpr == sig.fpr: 

949 return 

950 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

951 

952 

953class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])): 

954 """Named tuple encapsulating a single tree entry.""" 

955 

956 def in_path(self, path: bytes): 

957 """Return a copy of this entry with the given path prepended.""" 

958 if not isinstance(self.path, bytes): 

959 raise TypeError(f"Expected bytes for path, got {path!r}") 

960 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) 

961 

962 

963def parse_tree(text, strict=False): 

964 """Parse a tree text. 

965 

966 Args: 

967 text: Serialized text to parse 

968 Returns: iterator of tuples of (name, mode, sha) 

969 

970 Raises: 

971 ObjectFormatException: if the object was malformed in some way 

972 """ 

973 count = 0 

974 length = len(text) 

975 while count < length: 

976 mode_end = text.index(b" ", count) 

977 mode_text = text[count:mode_end] 

978 if strict and mode_text.startswith(b"0"): 

979 raise ObjectFormatException(f"Invalid mode '{mode_text}'") 

980 try: 

981 mode = int(mode_text, 8) 

982 except ValueError as exc: 

983 raise ObjectFormatException(f"Invalid mode '{mode_text}'") from exc 

984 name_end = text.index(b"\0", mode_end) 

985 name = text[mode_end + 1 : name_end] 

986 count = name_end + 21 

987 sha = text[name_end + 1 : count] 

988 if len(sha) != 20: 

989 raise ObjectFormatException("Sha has invalid length") 

990 hexsha = sha_to_hex(sha) 

991 yield (name, mode, hexsha) 

992 

993 

994def serialize_tree(items): 

995 """Serialize the items in a tree to a text. 

996 

997 Args: 

998 items: Sorted iterable over (name, mode, sha) tuples 

999 Returns: Serialized tree text as chunks 

1000 """ 

1001 for name, mode, hexsha in items: 

1002 yield ( 

1003 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha) 

1004 ) 

1005 

1006 

1007def sorted_tree_items(entries, name_order: bool): 

1008 """Iterate over a tree entries dictionary. 

1009 

1010 Args: 

1011 name_order: If True, iterate entries in order of their name. If 

1012 False, iterate entries in tree order, that is, treat subtree entries as 

1013 having '/' appended. 

1014 entries: Dictionary mapping names to (mode, sha) tuples 

1015 Returns: Iterator over (name, mode, hexsha) 

1016 """ 

1017 if name_order: 

1018 key_func = key_entry_name_order 

1019 else: 

1020 key_func = key_entry 

1021 for name, entry in sorted(entries.items(), key=key_func): 

1022 mode, hexsha = entry 

1023 # Stricter type checks than normal to mirror checks in the Rust version. 

1024 mode = int(mode) 

1025 if not isinstance(hexsha, bytes): 

1026 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}") 

1027 yield TreeEntry(name, mode, hexsha) 

1028 

1029 

1030def key_entry(entry) -> bytes: 

1031 """Sort key for tree entry. 

1032 

1033 Args: 

1034 entry: (name, value) tuple 

1035 """ 

1036 (name, value) = entry 

1037 if stat.S_ISDIR(value[0]): 

1038 name += b"/" 

1039 return name 

1040 

1041 

1042def key_entry_name_order(entry): 

1043 """Sort key for tree entry in name order.""" 

1044 return entry[0] 

1045 

1046 

1047def pretty_format_tree_entry(name, mode, hexsha, encoding="utf-8") -> str: 

1048 """Pretty format tree entry. 

1049 

1050 Args: 

1051 name: Name of the directory entry 

1052 mode: Mode of entry 

1053 hexsha: Hexsha of the referenced object 

1054 Returns: string describing the tree entry 

1055 """ 

1056 if mode & stat.S_IFDIR: 

1057 kind = "tree" 

1058 else: 

1059 kind = "blob" 

1060 return "{:04o} {} {}\t{}\n".format( 

1061 mode, 

1062 kind, 

1063 hexsha.decode("ascii"), 

1064 name.decode(encoding, "replace"), 

1065 ) 

1066 

1067 

1068class SubmoduleEncountered(Exception): 

1069 """A submodule was encountered while resolving a path.""" 

1070 

1071 def __init__(self, path, sha) -> None: 

1072 self.path = path 

1073 self.sha = sha 

1074 

1075 

1076class Tree(ShaFile): 

1077 """A Git tree object.""" 

1078 

1079 type_name = b"tree" 

1080 type_num = 2 

1081 

1082 __slots__ = "_entries" 

1083 

1084 def __init__(self) -> None: 

1085 super().__init__() 

1086 self._entries: Dict[bytes, Tuple[int, bytes]] = {} 

1087 

1088 @classmethod 

1089 def from_path(cls, filename): 

1090 tree = ShaFile.from_path(filename) 

1091 if not isinstance(tree, cls): 

1092 raise NotTreeError(filename) 

1093 return tree 

1094 

1095 def __contains__(self, name) -> bool: 

1096 return name in self._entries 

1097 

1098 def __getitem__(self, name): 

1099 return self._entries[name] 

1100 

1101 def __setitem__(self, name, value) -> None: 

1102 """Set a tree entry by name. 

1103 

1104 Args: 

1105 name: The name of the entry, as a string. 

1106 value: A tuple of (mode, hexsha), where mode is the mode of the 

1107 entry as an integral type and hexsha is the hex SHA of the entry as 

1108 a string. 

1109 """ 

1110 mode, hexsha = value 

1111 self._entries[name] = (mode, hexsha) 

1112 self._needs_serialization = True 

1113 

1114 def __delitem__(self, name) -> None: 

1115 del self._entries[name] 

1116 self._needs_serialization = True 

1117 

1118 def __len__(self) -> int: 

1119 return len(self._entries) 

1120 

1121 def __iter__(self): 

1122 return iter(self._entries) 

1123 

1124 def add(self, name, mode, hexsha): 

1125 """Add an entry to the tree. 

1126 

1127 Args: 

1128 mode: The mode of the entry as an integral type. Not all 

1129 possible modes are supported by git; see check() for details. 

1130 name: The name of the entry, as a string. 

1131 hexsha: The hex SHA of the entry as a string. 

1132 """ 

1133 self._entries[name] = mode, hexsha 

1134 self._needs_serialization = True 

1135 

1136 def iteritems(self, name_order=False): 

1137 """Iterate over entries. 

1138 

1139 Args: 

1140 name_order: If True, iterate in name order instead of tree 

1141 order. 

1142 Returns: Iterator over (name, mode, sha) tuples 

1143 """ 

1144 return sorted_tree_items(self._entries, name_order) 

1145 

1146 def items(self): 

1147 """Return the sorted entries in this tree. 

1148 

1149 Returns: List with (name, mode, sha) tuples 

1150 """ 

1151 return list(self.iteritems()) 

1152 

1153 def _deserialize(self, chunks): 

1154 """Grab the entries in the tree.""" 

1155 try: 

1156 parsed_entries = parse_tree(b"".join(chunks)) 

1157 except ValueError as exc: 

1158 raise ObjectFormatException(exc) from exc 

1159 # TODO: list comprehension is for efficiency in the common (small) 

1160 # case; if memory efficiency in the large case is a concern, use a 

1161 # genexp. 

1162 self._entries = {n: (m, s) for n, m, s in parsed_entries} 

1163 

1164 def check(self): 

1165 """Check this object for internal consistency. 

1166 

1167 Raises: 

1168 ObjectFormatException: if the object is malformed in some way 

1169 """ 

1170 super().check() 

1171 assert self._chunked_text is not None 

1172 last = None 

1173 allowed_modes = ( 

1174 stat.S_IFREG | 0o755, 

1175 stat.S_IFREG | 0o644, 

1176 stat.S_IFLNK, 

1177 stat.S_IFDIR, 

1178 S_IFGITLINK, 

1179 # TODO: optionally exclude as in git fsck --strict 

1180 stat.S_IFREG | 0o664, 

1181 ) 

1182 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True): 

1183 check_hexsha(sha, f"invalid sha {sha}") 

1184 if b"/" in name or name in (b"", b".", b"..", b".git"): 

1185 raise ObjectFormatException( 

1186 "invalid name {}".format(name.decode("utf-8", "replace")) 

1187 ) 

1188 

1189 if mode not in allowed_modes: 

1190 raise ObjectFormatException(f"invalid mode {mode:06o}") 

1191 

1192 entry = (name, (mode, sha)) 

1193 if last: 

1194 if key_entry(last) > key_entry(entry): 

1195 raise ObjectFormatException("entries not sorted") 

1196 if name == last[0]: 

1197 raise ObjectFormatException(f"duplicate entry {name}") 

1198 last = entry 

1199 

1200 def _serialize(self): 

1201 return list(serialize_tree(self.iteritems())) 

1202 

1203 def as_pretty_string(self) -> str: 

1204 text: List[str] = [] 

1205 for name, mode, hexsha in self.iteritems(): 

1206 text.append(pretty_format_tree_entry(name, mode, hexsha)) 

1207 return "".join(text) 

1208 

1209 def lookup_path(self, lookup_obj, path): 

1210 """Look up an object in a Git tree. 

1211 

1212 Args: 

1213 lookup_obj: Callback for retrieving object by SHA1 

1214 path: Path to lookup 

1215 Returns: A tuple of (mode, SHA) of the resulting path. 

1216 """ 

1217 parts = path.split(b"/") 

1218 sha = self.id 

1219 mode = None 

1220 for i, p in enumerate(parts): 

1221 if not p: 

1222 continue 

1223 if mode is not None and S_ISGITLINK(mode): 

1224 raise SubmoduleEncountered(b"/".join(parts[:i]), sha) 

1225 obj = lookup_obj(sha) 

1226 if not isinstance(obj, Tree): 

1227 raise NotTreeError(sha) 

1228 mode, sha = obj[p] 

1229 return mode, sha 

1230 

1231 

1232def parse_timezone(text): 

1233 """Parse a timezone text fragment (e.g. '+0100'). 

1234 

1235 Args: 

1236 text: Text to parse. 

1237 Returns: Tuple with timezone as seconds difference to UTC 

1238 and a boolean indicating whether this was a UTC timezone 

1239 prefixed with a negative sign (-0000). 

1240 """ 

1241 # cgit parses the first character as the sign, and the rest 

1242 # as an integer (using strtol), which could also be negative. 

1243 # We do the same for compatibility. See #697828. 

1244 if text[0] not in b"+-": 

1245 raise ValueError("Timezone must start with + or - ({text})".format(**vars())) 

1246 sign = text[:1] 

1247 offset = int(text[1:]) 

1248 if sign == b"-": 

1249 offset = -offset 

1250 unnecessary_negative_timezone = offset >= 0 and sign == b"-" 

1251 signum = (offset < 0) and -1 or 1 

1252 offset = abs(offset) 

1253 hours = int(offset / 100) 

1254 minutes = offset % 100 

1255 return ( 

1256 signum * (hours * 3600 + minutes * 60), 

1257 unnecessary_negative_timezone, 

1258 ) 

1259 

1260 

1261def format_timezone(offset, unnecessary_negative_timezone=False): 

1262 """Format a timezone for Git serialization. 

1263 

1264 Args: 

1265 offset: Timezone offset as seconds difference to UTC 

1266 unnecessary_negative_timezone: Whether to use a minus sign for 

1267 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). 

1268 """ 

1269 if offset % 60 != 0: 

1270 raise ValueError("Unable to handle non-minute offset.") 

1271 if offset < 0 or unnecessary_negative_timezone: 

1272 sign = "-" 

1273 offset = -offset 

1274 else: 

1275 sign = "+" 

1276 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") 

1277 

1278 

1279def parse_time_entry(value): 

1280 """Parse event. 

1281 

1282 Args: 

1283 value: Bytes representing a git commit/tag line 

1284 Raises: 

1285 ObjectFormatException in case of parsing error (malformed 

1286 field date) 

1287 Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) 

1288 """ 

1289 try: 

1290 sep = value.rindex(b"> ") 

1291 except ValueError: 

1292 return (value, None, (None, False)) 

1293 try: 

1294 person = value[0 : sep + 1] 

1295 rest = value[sep + 2 :] 

1296 timetext, timezonetext = rest.rsplit(b" ", 1) 

1297 time = int(timetext) 

1298 timezone, timezone_neg_utc = parse_timezone(timezonetext) 

1299 except ValueError as exc: 

1300 raise ObjectFormatException(exc) from exc 

1301 return person, time, (timezone, timezone_neg_utc) 

1302 

1303 

1304def format_time_entry(person, time, timezone_info): 

1305 """Format an event.""" 

1306 (timezone, timezone_neg_utc) = timezone_info 

1307 return b" ".join( 

1308 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)] 

1309 ) 

1310 

1311 

1312def parse_commit(chunks): 

1313 """Parse a commit object from chunks. 

1314 

1315 Args: 

1316 chunks: Chunks to parse 

1317 Returns: Tuple of (tree, parents, author_info, commit_info, 

1318 encoding, mergetag, gpgsig, message, extra) 

1319 """ 

1320 warnings.warn("parse_commit will be removed in 0.22", DeprecationWarning) 

1321 parents = [] 

1322 extra = [] 

1323 tree = None 

1324 author_info = (None, None, (None, None)) 

1325 commit_info = (None, None, (None, None)) 

1326 encoding = None 

1327 mergetag = [] 

1328 message = None 

1329 gpgsig = None 

1330 

1331 for field, value in _parse_message(chunks): 

1332 # TODO(jelmer): Enforce ordering 

1333 if field == _TREE_HEADER: 

1334 tree = value 

1335 elif field == _PARENT_HEADER: 

1336 parents.append(value) 

1337 elif field == _AUTHOR_HEADER: 

1338 author_info = parse_time_entry(value) 

1339 elif field == _COMMITTER_HEADER: 

1340 commit_info = parse_time_entry(value) 

1341 elif field == _ENCODING_HEADER: 

1342 encoding = value 

1343 elif field == _MERGETAG_HEADER: 

1344 mergetag.append(Tag.from_string(value + b"\n")) 

1345 elif field == _GPGSIG_HEADER: 

1346 gpgsig = value 

1347 elif field is None: 

1348 message = value 

1349 else: 

1350 extra.append((field, value)) 

1351 return ( 

1352 tree, 

1353 parents, 

1354 author_info, 

1355 commit_info, 

1356 encoding, 

1357 mergetag, 

1358 gpgsig, 

1359 message, 

1360 extra, 

1361 ) 

1362 

1363 

1364class Commit(ShaFile): 

1365 """A git commit object.""" 

1366 

1367 type_name = b"commit" 

1368 type_num = 1 

1369 

1370 __slots__ = ( 

1371 "_parents", 

1372 "_encoding", 

1373 "_extra", 

1374 "_author_timezone_neg_utc", 

1375 "_commit_timezone_neg_utc", 

1376 "_commit_time", 

1377 "_author_time", 

1378 "_author_timezone", 

1379 "_commit_timezone", 

1380 "_author", 

1381 "_committer", 

1382 "_tree", 

1383 "_message", 

1384 "_mergetag", 

1385 "_gpgsig", 

1386 ) 

1387 

1388 def __init__(self) -> None: 

1389 super().__init__() 

1390 self._parents: List[bytes] = [] 

1391 self._encoding = None 

1392 self._mergetag: List[Tag] = [] 

1393 self._gpgsig = None 

1394 self._extra: List[Tuple[bytes, bytes]] = [] 

1395 self._author_timezone_neg_utc = False 

1396 self._commit_timezone_neg_utc = False 

1397 

1398 @classmethod 

1399 def from_path(cls, path): 

1400 commit = ShaFile.from_path(path) 

1401 if not isinstance(commit, cls): 

1402 raise NotCommitError(path) 

1403 return commit 

1404 

1405 def _deserialize(self, chunks): 

1406 self._parents = [] 

1407 self._extra = [] 

1408 self._tree = None 

1409 author_info = (None, None, (None, None)) 

1410 commit_info = (None, None, (None, None)) 

1411 self._encoding = None 

1412 self._mergetag = [] 

1413 self._message = None 

1414 self._gpgsig = None 

1415 

1416 for field, value in _parse_message(chunks): 

1417 # TODO(jelmer): Enforce ordering 

1418 if field == _TREE_HEADER: 

1419 self._tree = value 

1420 elif field == _PARENT_HEADER: 

1421 assert value is not None 

1422 self._parents.append(value) 

1423 elif field == _AUTHOR_HEADER: 

1424 author_info = parse_time_entry(value) 

1425 elif field == _COMMITTER_HEADER: 

1426 commit_info = parse_time_entry(value) 

1427 elif field == _ENCODING_HEADER: 

1428 self._encoding = value 

1429 elif field == _MERGETAG_HEADER: 

1430 assert value is not None 

1431 self._mergetag.append(Tag.from_string(value + b"\n")) 

1432 elif field == _GPGSIG_HEADER: 

1433 self._gpgsig = value 

1434 elif field is None: 

1435 self._message = value 

1436 else: 

1437 self._extra.append((field, value)) 

1438 

1439 ( 

1440 self._author, 

1441 self._author_time, 

1442 (self._author_timezone, self._author_timezone_neg_utc), 

1443 ) = author_info 

1444 ( 

1445 self._committer, 

1446 self._commit_time, 

1447 (self._commit_timezone, self._commit_timezone_neg_utc), 

1448 ) = commit_info 

1449 

1450 def check(self): 

1451 """Check this object for internal consistency. 

1452 

1453 Raises: 

1454 ObjectFormatException: if the object is malformed in some way 

1455 """ 

1456 super().check() 

1457 assert self._chunked_text is not None 

1458 self._check_has_member("_tree", "missing tree") 

1459 self._check_has_member("_author", "missing author") 

1460 self._check_has_member("_committer", "missing committer") 

1461 self._check_has_member("_author_time", "missing author time") 

1462 self._check_has_member("_commit_time", "missing commit time") 

1463 

1464 for parent in self._parents: 

1465 check_hexsha(parent, "invalid parent sha") 

1466 check_hexsha(self._tree, "invalid tree sha") 

1467 

1468 check_identity(self._author, "invalid author") 

1469 check_identity(self._committer, "invalid committer") 

1470 

1471 check_time(self._author_time) 

1472 check_time(self._commit_time) 

1473 

1474 last = None 

1475 for field, _ in _parse_message(self._chunked_text): 

1476 if field == _TREE_HEADER and last is not None: 

1477 raise ObjectFormatException("unexpected tree") 

1478 elif field == _PARENT_HEADER and last not in ( 

1479 _PARENT_HEADER, 

1480 _TREE_HEADER, 

1481 ): 

1482 raise ObjectFormatException("unexpected parent") 

1483 elif field == _AUTHOR_HEADER and last not in ( 

1484 _TREE_HEADER, 

1485 _PARENT_HEADER, 

1486 ): 

1487 raise ObjectFormatException("unexpected author") 

1488 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: 

1489 raise ObjectFormatException("unexpected committer") 

1490 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: 

1491 raise ObjectFormatException("unexpected encoding") 

1492 last = field 

1493 

1494 # TODO: optionally check for duplicate parents 

1495 

1496 def sign(self, keyid: Optional[str] = None): 

1497 import gpg 

1498 

1499 with gpg.Context(armor=True) as c: 

1500 if keyid is not None: 

1501 key = c.get_key(keyid) 

1502 with gpg.Context(armor=True, signers=[key]) as ctx: 

1503 self.gpgsig, unused_result = ctx.sign( 

1504 self.as_raw_string(), 

1505 mode=gpg.constants.sig.mode.DETACH, 

1506 ) 

1507 else: 

1508 self.gpgsig, unused_result = c.sign( 

1509 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH 

1510 ) 

1511 

1512 def verify(self, keyids: Optional[Iterable[str]] = None): 

1513 """Verify GPG signature for this commit (if it is signed). 

1514 

1515 Args: 

1516 keyids: Optional iterable of trusted keyids for this commit. 

1517 If this commit is not signed by any key in keyids verification will 

1518 fail. If not specified, this function only verifies that the commit 

1519 has a valid signature. 

1520 

1521 Raises: 

1522 gpg.errors.BadSignatures: if GPG signature verification fails 

1523 gpg.errors.MissingSignatures: if commit was not signed by a key 

1524 specified in keyids 

1525 """ 

1526 if self._gpgsig is None: 

1527 return 

1528 

1529 import gpg 

1530 

1531 with gpg.Context() as ctx: 

1532 self_without_gpgsig = self.copy() 

1533 self_without_gpgsig._gpgsig = None 

1534 self_without_gpgsig.gpgsig = None 

1535 data, result = ctx.verify( 

1536 self_without_gpgsig.as_raw_string(), 

1537 signature=self._gpgsig, 

1538 ) 

1539 if keyids: 

1540 keys = [ctx.get_key(key) for key in keyids] 

1541 for key in keys: 

1542 for subkey in keys: 

1543 for sig in result.signatures: 

1544 if subkey.can_sign and subkey.fpr == sig.fpr: 

1545 return 

1546 raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) 

1547 

1548 def _serialize(self): 

1549 headers = [] 

1550 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree 

1551 headers.append((_TREE_HEADER, tree_bytes)) 

1552 for p in self._parents: 

1553 headers.append((_PARENT_HEADER, p)) 

1554 headers.append( 

1555 ( 

1556 _AUTHOR_HEADER, 

1557 format_time_entry( 

1558 self._author, 

1559 self._author_time, 

1560 (self._author_timezone, self._author_timezone_neg_utc), 

1561 ), 

1562 ) 

1563 ) 

1564 headers.append( 

1565 ( 

1566 _COMMITTER_HEADER, 

1567 format_time_entry( 

1568 self._committer, 

1569 self._commit_time, 

1570 (self._commit_timezone, self._commit_timezone_neg_utc), 

1571 ), 

1572 ) 

1573 ) 

1574 if self.encoding: 

1575 headers.append((_ENCODING_HEADER, self.encoding)) 

1576 for mergetag in self.mergetag: 

1577 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1])) 

1578 headers.extend(self._extra) 

1579 if self.gpgsig: 

1580 headers.append((_GPGSIG_HEADER, self.gpgsig)) 

1581 return list(_format_message(headers, self._message)) 

1582 

1583 tree = serializable_property("tree", "Tree that is the state of this commit") 

1584 

1585 def _get_parents(self): 

1586 """Return a list of parents of this commit.""" 

1587 return self._parents 

1588 

1589 def _set_parents(self, value): 

1590 """Set a list of parents of this commit.""" 

1591 self._needs_serialization = True 

1592 self._parents = value 

1593 

1594 parents = property( 

1595 _get_parents, 

1596 _set_parents, 

1597 doc="Parents of this commit, by their SHA1.", 

1598 ) 

1599 

1600 def _get_extra(self): 

1601 """Return extra settings of this commit.""" 

1602 warnings.warn( 

1603 "Commit.extra is deprecated. Use Commit._extra instead.", 

1604 DeprecationWarning, 

1605 stacklevel=2, 

1606 ) 

1607 return self._extra 

1608 

1609 extra = property( 

1610 _get_extra, 

1611 doc="Extra header fields not understood (presumably added in a " 

1612 "newer version of git). Kept verbatim so the object can " 

1613 "be correctly reserialized. For private commit metadata, use " 

1614 "pseudo-headers in Commit.message, rather than this field.", 

1615 ) 

1616 

1617 author = serializable_property("author", "The name of the author of the commit") 

1618 

1619 committer = serializable_property( 

1620 "committer", "The name of the committer of the commit" 

1621 ) 

1622 

1623 message = serializable_property("message", "The commit message") 

1624 

1625 commit_time = serializable_property( 

1626 "commit_time", 

1627 "The timestamp of the commit. As the number of seconds since the " "epoch.", 

1628 ) 

1629 

1630 commit_timezone = serializable_property( 

1631 "commit_timezone", "The zone the commit time is in" 

1632 ) 

1633 

1634 author_time = serializable_property( 

1635 "author_time", 

1636 "The timestamp the commit was written. As the number of " 

1637 "seconds since the epoch.", 

1638 ) 

1639 

1640 author_timezone = serializable_property( 

1641 "author_timezone", "Returns the zone the author time is in." 

1642 ) 

1643 

1644 encoding = serializable_property("encoding", "Encoding of the commit message.") 

1645 

1646 mergetag = serializable_property("mergetag", "Associated signed tag.") 

1647 

1648 gpgsig = serializable_property("gpgsig", "GPG Signature.") 

1649 

1650 

1651OBJECT_CLASSES = ( 

1652 Commit, 

1653 Tree, 

1654 Blob, 

1655 Tag, 

1656) 

1657 

1658_TYPE_MAP: Dict[Union[bytes, int], Type[ShaFile]] = {} 

1659 

1660for cls in OBJECT_CLASSES: 

1661 _TYPE_MAP[cls.type_name] = cls 

1662 _TYPE_MAP[cls.type_num] = cls 

1663 

1664 

1665# Hold on to the pure-python implementations for testing 

1666_parse_tree_py = parse_tree 

1667_sorted_tree_items_py = sorted_tree_items 

1668try: 

1669 # Try to import Rust versions 

1670 from dulwich._objects import parse_tree, sorted_tree_items # type: ignore 

1671except ImportError: 

1672 pass