Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# objects.py -- Access to base git objects
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Access to base git objects."""
25import binascii
26import os
27import posixpath
28import stat
29import sys
30import zlib
31from collections import namedtuple
32from collections.abc import Callable, Iterable, Iterator
33from hashlib import sha1
34from io import BufferedIOBase, BytesIO
35from typing import (
36 IO,
37 TYPE_CHECKING,
38 Optional,
39 Union,
40)
42if sys.version_info >= (3, 11):
43 from typing import Self
44else:
45 from typing_extensions import Self
47if sys.version_info >= (3, 10):
48 from typing import TypeGuard
49else:
50 from typing_extensions import TypeGuard
52from . import replace_me
53from .errors import (
54 ChecksumMismatch,
55 FileFormatException,
56 NotBlobError,
57 NotCommitError,
58 NotTagError,
59 NotTreeError,
60 ObjectFormatException,
61)
62from .file import GitFile
64if TYPE_CHECKING:
65 from _hashlib import HASH
67 from .file import _GitFile
69ZERO_SHA = b"0" * 40
71# Header fields for commits
72_TREE_HEADER = b"tree"
73_PARENT_HEADER = b"parent"
74_AUTHOR_HEADER = b"author"
75_COMMITTER_HEADER = b"committer"
76_ENCODING_HEADER = b"encoding"
77_MERGETAG_HEADER = b"mergetag"
78_GPGSIG_HEADER = b"gpgsig"
80# Header fields for objects
81_OBJECT_HEADER = b"object"
82_TYPE_HEADER = b"type"
83_TAG_HEADER = b"tag"
84_TAGGER_HEADER = b"tagger"
87S_IFGITLINK = 0o160000
90MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
92BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
95ObjectID = bytes
98class EmptyFileException(FileFormatException):
99 """An unexpectedly empty file was encountered."""
102def S_ISGITLINK(m: int) -> bool:
103 """Check if a mode indicates a submodule.
105 Args:
106 m: Mode to check
107 Returns: a ``boolean``
108 """
109 return stat.S_IFMT(m) == S_IFGITLINK
112def _decompress(string: bytes) -> bytes:
113 dcomp = zlib.decompressobj()
114 dcomped = dcomp.decompress(string)
115 dcomped += dcomp.flush()
116 return dcomped
119def sha_to_hex(sha: ObjectID) -> bytes:
120 """Takes a string and returns the hex of the sha within."""
121 hexsha = binascii.hexlify(sha)
122 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}"
123 return hexsha
126def hex_to_sha(hex: Union[bytes, str]) -> bytes:
127 """Takes a hex sha and returns a binary sha."""
128 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}"
129 try:
130 return binascii.unhexlify(hex)
131 except TypeError as exc:
132 if not isinstance(hex, bytes):
133 raise
134 raise ValueError(exc.args[0]) from exc
137def valid_hexsha(hex: Union[bytes, str]) -> bool:
138 if len(hex) != 40:
139 return False
140 try:
141 binascii.unhexlify(hex)
142 except (TypeError, binascii.Error):
143 return False
144 else:
145 return True
148def hex_to_filename(
149 path: Union[str, bytes], hex: Union[str, bytes]
150) -> Union[str, bytes]:
151 """Takes a hex sha and returns its filename relative to the given path."""
152 # os.path.join accepts bytes or unicode, but all args must be of the same
153 # type. Make sure that hex which is expected to be bytes, is the same type
154 # as path.
155 if type(path) is not type(hex) and isinstance(path, str):
156 hex = hex.decode("ascii") # type: ignore
157 dir_name = hex[:2]
158 file_name = hex[2:]
159 # Check from object dir
160 return os.path.join(path, dir_name, file_name) # type: ignore
163def filename_to_hex(filename: Union[str, bytes]) -> str:
164 """Takes an object filename and returns its corresponding hex sha."""
165 # grab the last (up to) two path components
166 names = filename.rsplit(os.path.sep, 2)[-2:] # type: ignore
167 errmsg = f"Invalid object filename: {filename!r}"
168 assert len(names) == 2, errmsg
169 base, rest = names
170 assert len(base) == 2 and len(rest) == 38, errmsg
171 hex_bytes = (base + rest).encode("ascii") # type: ignore
172 hex_to_sha(hex_bytes)
173 return hex_bytes.decode("ascii")
176def object_header(num_type: int, length: int) -> bytes:
177 """Return an object header for the given numeric type and text length."""
178 cls = object_class(num_type)
179 if cls is None:
180 raise AssertionError(f"unsupported class type num: {num_type}")
181 return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
184def serializable_property(name: str, docstring: Optional[str] = None) -> property:
185 """A property that helps tracking whether serialization is necessary."""
187 def set(obj: "ShaFile", value: object) -> None:
188 setattr(obj, "_" + name, value)
189 obj._needs_serialization = True
191 def get(obj: "ShaFile") -> object:
192 return getattr(obj, "_" + name)
194 return property(get, set, doc=docstring)
197def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]:
198 """Get the object class corresponding to the given type.
200 Args:
201 type: Either a type name string or a numeric type.
202 Returns: The ShaFile subclass corresponding to the given type, or None if
203 type is not a valid type name/number.
204 """
205 return _TYPE_MAP.get(type, None)
208def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None:
209 """Check if a string is a valid hex sha string.
211 Args:
212 hex: Hex string to check
213 error_msg: Error message to use in exception
214 Raises:
215 ObjectFormatException: Raised when the string is not valid
216 """
217 if not valid_hexsha(hex):
218 raise ObjectFormatException(f"{error_msg} {hex!r}")
221def check_identity(identity: Optional[bytes], error_msg: str) -> None:
222 """Check if the specified identity is valid.
224 This will raise an exception if the identity is not valid.
226 Args:
227 identity: Identity string
228 error_msg: Error message to use in exception
229 """
230 if identity is None:
231 raise ObjectFormatException(error_msg)
232 email_start = identity.find(b"<")
233 email_end = identity.find(b">")
234 if not all(
235 [
236 email_start >= 1,
237 identity[email_start - 1] == b" "[0],
238 identity.find(b"<", email_start + 1) == -1,
239 email_end == len(identity) - 1,
240 b"\0" not in identity,
241 b"\n" not in identity,
242 ]
243 ):
244 raise ObjectFormatException(error_msg)
247def _path_to_bytes(path: Union[str, bytes]) -> bytes:
248 """Convert a path to bytes for use in error messages."""
249 if isinstance(path, str):
250 return path.encode("utf-8", "surrogateescape")
251 return path
254def check_time(time_seconds: int) -> None:
255 """Check if the specified time is not prone to overflow error.
257 This will raise an exception if the time is not valid.
259 Args:
260 time_seconds: time in seconds
262 """
263 # Prevent overflow error
264 if time_seconds > MAX_TIME:
265 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
268def git_line(*items: bytes) -> bytes:
269 """Formats items into a space separated line."""
270 return b" ".join(items) + b"\n"
273class FixedSha:
274 """SHA object that behaves like hashlib's but is given a fixed value."""
276 __slots__ = ("_hexsha", "_sha")
278 def __init__(self, hexsha: Union[str, bytes]) -> None:
279 if isinstance(hexsha, str):
280 hexsha = hexsha.encode("ascii")
281 if not isinstance(hexsha, bytes):
282 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
283 self._hexsha = hexsha
284 self._sha = hex_to_sha(hexsha)
286 def digest(self) -> bytes:
287 """Return the raw SHA digest."""
288 return self._sha
290 def hexdigest(self) -> str:
291 """Return the hex SHA digest."""
292 return self._hexsha.decode("ascii")
295# Type guard functions for runtime type narrowing
296if TYPE_CHECKING:
298 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
299 """Check if a ShaFile is a Commit."""
300 return obj.type_name == b"commit"
302 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
303 """Check if a ShaFile is a Tree."""
304 return obj.type_name == b"tree"
306 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
307 """Check if a ShaFile is a Blob."""
308 return obj.type_name == b"blob"
310 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
311 """Check if a ShaFile is a Tag."""
312 return obj.type_name == b"tag"
313else:
314 # Runtime versions without type narrowing
315 def is_commit(obj: "ShaFile") -> bool:
316 """Check if a ShaFile is a Commit."""
317 return obj.type_name == b"commit"
319 def is_tree(obj: "ShaFile") -> bool:
320 """Check if a ShaFile is a Tree."""
321 return obj.type_name == b"tree"
323 def is_blob(obj: "ShaFile") -> bool:
324 """Check if a ShaFile is a Blob."""
325 return obj.type_name == b"blob"
327 def is_tag(obj: "ShaFile") -> bool:
328 """Check if a ShaFile is a Tag."""
329 return obj.type_name == b"tag"
332class ShaFile:
333 """A git SHA file."""
335 __slots__ = ("_chunked_text", "_needs_serialization", "_sha")
337 _needs_serialization: bool
338 type_name: bytes
339 type_num: int
340 _chunked_text: Optional[list[bytes]]
341 _sha: Union[FixedSha, None, "HASH"]
343 @staticmethod
344 def _parse_legacy_object_header(
345 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
346 ) -> "ShaFile":
347 """Parse a legacy object, creating it but not reading the file."""
348 bufsize = 1024
349 decomp = zlib.decompressobj()
350 header = decomp.decompress(magic)
351 start = 0
352 end = -1
353 while end < 0:
354 extra = f.read(bufsize)
355 header += decomp.decompress(extra)
356 magic += extra
357 end = header.find(b"\0", start)
358 start = len(header)
359 header = header[:end]
360 type_name, size = header.split(b" ", 1)
361 try:
362 int(size) # sanity check
363 except ValueError as exc:
364 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc
365 obj_class = object_class(type_name)
366 if not obj_class:
367 raise ObjectFormatException(
368 "Not a known type: {}".format(type_name.decode("ascii"))
369 )
370 return obj_class()
372 def _parse_legacy_object(self, map: bytes) -> None:
373 """Parse a legacy object, setting the raw string."""
374 text = _decompress(map)
375 header_end = text.find(b"\0")
376 if header_end < 0:
377 raise ObjectFormatException("Invalid object header, no \\0")
378 self.set_raw_string(text[header_end + 1 :])
380 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
381 """Return chunks representing the object in the experimental format.
383 Returns: List of strings
384 """
385 compobj = zlib.compressobj(compression_level)
386 yield compobj.compress(self._header())
387 for chunk in self.as_raw_chunks():
388 yield compobj.compress(chunk)
389 yield compobj.flush()
391 def as_legacy_object(self, compression_level: int = -1) -> bytes:
392 """Return string representing the object in the experimental format."""
393 return b"".join(
394 self.as_legacy_object_chunks(compression_level=compression_level)
395 )
397 def as_raw_chunks(self) -> list[bytes]:
398 """Return chunks with serialization of the object.
400 Returns: List of strings, not necessarily one per line
401 """
402 if self._needs_serialization:
403 self._sha = None
404 self._chunked_text = self._serialize()
405 self._needs_serialization = False
406 return self._chunked_text # type: ignore
408 def as_raw_string(self) -> bytes:
409 """Return raw string with serialization of the object.
411 Returns: String object
412 """
413 return b"".join(self.as_raw_chunks())
415 def __bytes__(self) -> bytes:
416 """Return raw string serialization of this object."""
417 return self.as_raw_string()
419 def __hash__(self) -> int:
420 """Return unique hash for this object."""
421 return hash(self.id)
423 def as_pretty_string(self) -> str:
424 """Return a string representing this object, fit for display."""
425 return self.as_raw_string().decode("utf-8", "replace")
427 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None:
428 """Set the contents of this object from a serialized string."""
429 if not isinstance(text, bytes):
430 raise TypeError(f"Expected bytes for text, got {text!r}")
431 self.set_raw_chunks([text], sha)
433 def set_raw_chunks(
434 self, chunks: list[bytes], sha: Optional[ObjectID] = None
435 ) -> None:
436 """Set the contents of this object from a list of chunks."""
437 self._chunked_text = chunks
438 self._deserialize(chunks)
439 if sha is None:
440 self._sha = None
441 else:
442 self._sha = FixedSha(sha)
443 self._needs_serialization = False
445 @staticmethod
446 def _parse_object_header(
447 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
448 ) -> "ShaFile":
449 """Parse a new style object, creating it but not reading the file."""
450 num_type = (ord(magic[0:1]) >> 4) & 7
451 obj_class = object_class(num_type)
452 if not obj_class:
453 raise ObjectFormatException(f"Not a known type {num_type}")
454 return obj_class()
456 def _parse_object(self, map: bytes) -> None:
457 """Parse a new style object, setting self._text."""
458 # skip type and size; type must have already been determined, and
459 # we trust zlib to fail if it's otherwise corrupted
460 byte = ord(map[0:1])
461 used = 1
462 while (byte & 0x80) != 0:
463 byte = ord(map[used : used + 1])
464 used += 1
465 raw = map[used:]
466 self.set_raw_string(_decompress(raw))
468 @classmethod
469 def _is_legacy_object(cls, magic: bytes) -> bool:
470 b0 = ord(magic[0:1])
471 b1 = ord(magic[1:2])
472 word = (b0 << 8) + b1
473 return (b0 & 0x8F) == 0x08 and (word % 31) == 0
475 @classmethod
476 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
477 map = f.read()
478 if not map:
479 raise EmptyFileException("Corrupted empty file detected")
481 if cls._is_legacy_object(map):
482 obj = cls._parse_legacy_object_header(map, f)
483 obj._parse_legacy_object(map)
484 else:
485 obj = cls._parse_object_header(map, f)
486 obj._parse_object(map)
487 return obj
489 def __init__(self) -> None:
490 """Don't call this directly."""
491 self._sha = None
492 self._chunked_text = []
493 self._needs_serialization = True
495 def _deserialize(self, chunks: list[bytes]) -> None:
496 raise NotImplementedError(self._deserialize)
498 def _serialize(self) -> list[bytes]:
499 raise NotImplementedError(self._serialize)
501 @classmethod
502 def from_path(cls, path: Union[str, bytes]) -> "ShaFile":
503 """Open a SHA file from disk."""
504 with GitFile(path, "rb") as f:
505 return cls.from_file(f)
507 @classmethod
508 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
509 """Get the contents of a SHA file on disk."""
510 try:
511 obj = cls._parse_file(f)
512 obj._sha = None
513 return obj
514 except (IndexError, ValueError) as exc:
515 raise ObjectFormatException("invalid object header") from exc
517 @staticmethod
518 def from_raw_string(
519 type_num: int, string: bytes, sha: Optional[ObjectID] = None
520 ) -> "ShaFile":
521 """Creates an object of the indicated type from the raw string given.
523 Args:
524 type_num: The numeric type of the object.
525 string: The raw uncompressed contents.
526 sha: Optional known sha for the object
527 """
528 cls = object_class(type_num)
529 if cls is None:
530 raise AssertionError(f"unsupported class type num: {type_num}")
531 obj = cls()
532 obj.set_raw_string(string, sha)
533 return obj
535 @staticmethod
536 def from_raw_chunks(
537 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None
538 ) -> "ShaFile":
539 """Creates an object of the indicated type from the raw chunks given.
541 Args:
542 type_num: The numeric type of the object.
543 chunks: An iterable of the raw uncompressed contents.
544 sha: Optional known sha for the object
545 """
546 cls = object_class(type_num)
547 if cls is None:
548 raise AssertionError(f"unsupported class type num: {type_num}")
549 obj = cls()
550 obj.set_raw_chunks(chunks, sha)
551 return obj
553 @classmethod
554 def from_string(cls, string: bytes) -> Self:
555 """Create a ShaFile from a string."""
556 obj = cls()
557 obj.set_raw_string(string)
558 return obj
560 def _check_has_member(self, member: str, error_msg: str) -> None:
561 """Check that the object has a given member variable.
563 Args:
564 member: the member variable to check for
565 error_msg: the message for an error if the member is missing
566 Raises:
567 ObjectFormatException: with the given error_msg if member is
568 missing or is None
569 """
570 if getattr(self, member, None) is None:
571 raise ObjectFormatException(error_msg)
573 def check(self) -> None:
574 """Check this object for internal consistency.
576 Raises:
577 ObjectFormatException: if the object is malformed in some way
578 ChecksumMismatch: if the object was created with a SHA that does
579 not match its contents
580 """
581 # TODO: if we find that error-checking during object parsing is a
582 # performance bottleneck, those checks should be moved to the class's
583 # check() method during optimization so we can still check the object
584 # when necessary.
585 old_sha = self.id
586 try:
587 self._deserialize(self.as_raw_chunks())
588 self._sha = None
589 new_sha = self.id
590 except Exception as exc:
591 raise ObjectFormatException(exc) from exc
592 if old_sha != new_sha:
593 raise ChecksumMismatch(new_sha, old_sha)
595 def _header(self) -> bytes:
596 return object_header(self.type_num, self.raw_length())
598 def raw_length(self) -> int:
599 """Returns the length of the raw string of this object."""
600 return sum(map(len, self.as_raw_chunks()))
602 def sha(self) -> Union[FixedSha, "HASH"]:
603 """The SHA1 object that is the name of this object."""
604 if self._sha is None or self._needs_serialization:
605 # this is a local because as_raw_chunks() overwrites self._sha
606 new_sha = sha1()
607 new_sha.update(self._header())
608 for chunk in self.as_raw_chunks():
609 new_sha.update(chunk)
610 self._sha = new_sha
611 return self._sha
613 def copy(self) -> "ShaFile":
614 """Create a new copy of this SHA1 object from its raw string."""
615 obj_class = object_class(self.type_num)
616 if obj_class is None:
617 raise AssertionError(f"invalid type num {self.type_num}")
618 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
620 @property
621 def id(self) -> bytes:
622 """The hex SHA of this object."""
623 return self.sha().hexdigest().encode("ascii")
625 def __repr__(self) -> str:
626 return f"<{self.__class__.__name__} {self.id!r}>"
628 def __ne__(self, other: object) -> bool:
629 """Check whether this object does not match the other."""
630 return not isinstance(other, ShaFile) or self.id != other.id
632 def __eq__(self, other: object) -> bool:
633 """Return True if the SHAs of the two objects match."""
634 return isinstance(other, ShaFile) and self.id == other.id
636 def __lt__(self, other: object) -> bool:
637 """Return whether SHA of this object is less than the other."""
638 if not isinstance(other, ShaFile):
639 raise TypeError
640 return self.id < other.id
642 def __le__(self, other: object) -> bool:
643 """Check whether SHA of this object is less than or equal to the other."""
644 if not isinstance(other, ShaFile):
645 raise TypeError
646 return self.id <= other.id
649class Blob(ShaFile):
650 """A Git Blob object."""
652 __slots__ = ()
654 type_name = b"blob"
655 type_num = 3
657 _chunked_text: list[bytes]
659 def __init__(self) -> None:
660 super().__init__()
661 self._chunked_text = []
662 self._needs_serialization = False
664 def _get_data(self) -> bytes:
665 return self.as_raw_string()
667 def _set_data(self, data: bytes) -> None:
668 self.set_raw_string(data)
670 data = property(
671 _get_data, _set_data, doc="The text contained within the blob object."
672 )
674 def _get_chunked(self) -> list[bytes]:
675 return self._chunked_text
677 def _set_chunked(self, chunks: list[bytes]) -> None:
678 self._chunked_text = chunks
680 def _serialize(self) -> list[bytes]:
681 return self._chunked_text
683 def _deserialize(self, chunks: list[bytes]) -> None:
684 self._chunked_text = chunks
686 chunked = property(
687 _get_chunked,
688 _set_chunked,
689 doc="The text in the blob object, as chunks (not necessarily lines)",
690 )
692 @classmethod
693 def from_path(cls, path: Union[str, bytes]) -> "Blob":
694 blob = ShaFile.from_path(path)
695 if not isinstance(blob, cls):
696 raise NotBlobError(_path_to_bytes(path))
697 return blob
699 def check(self) -> None:
700 """Check this object for internal consistency.
702 Raises:
703 ObjectFormatException: if the object is malformed in some way
704 """
705 super().check()
707 def splitlines(self) -> list[bytes]:
708 """Return list of lines in this blob.
710 This preserves the original line endings.
711 """
712 chunks = self.chunked
713 if not chunks:
714 return []
715 if len(chunks) == 1:
716 return chunks[0].splitlines(True) # type: ignore[no-any-return]
717 remaining = None
718 ret = []
719 for chunk in chunks:
720 lines = chunk.splitlines(True)
721 if len(lines) > 1:
722 ret.append((remaining or b"") + lines[0])
723 ret.extend(lines[1:-1])
724 remaining = lines[-1]
725 elif len(lines) == 1:
726 if remaining is None:
727 remaining = lines.pop()
728 else:
729 remaining += lines.pop()
730 if remaining is not None:
731 ret.append(remaining)
732 return ret
735def _parse_message(
736 chunks: Iterable[bytes],
737) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]:
738 """Parse a message with a list of fields and a body.
740 Args:
741 chunks: the raw chunks of the tag or commit object.
742 Returns: iterator of tuples of (field, value), one per header line, in the
743 order read from the text, possibly including duplicates. Includes a
744 field named None for the freeform tag/commit text.
745 """
746 f = BytesIO(b"".join(chunks))
747 k = None
748 v = b""
749 eof = False
751 def _strip_last_newline(value: bytes) -> bytes:
752 """Strip the last newline from value."""
753 if value and value.endswith(b"\n"):
754 return value[:-1]
755 return value
757 # Parse the headers
758 #
759 # Headers can contain newlines. The next line is indented with a space.
760 # We store the latest key as 'k', and the accumulated value as 'v'.
761 for line in f:
762 if line.startswith(b" "):
763 # Indented continuation of the previous line
764 v += line[1:]
765 else:
766 if k is not None:
767 # We parsed a new header, return its value
768 yield (k, _strip_last_newline(v))
769 if line == b"\n":
770 # Empty line indicates end of headers
771 break
772 (k, v) = line.split(b" ", 1)
774 else:
775 # We reached end of file before the headers ended. We still need to
776 # return the previous header, then we need to return a None field for
777 # the text.
778 eof = True
779 if k is not None:
780 yield (k, _strip_last_newline(v))
781 yield (None, None)
783 if not eof:
784 # We didn't reach the end of file while parsing headers. We can return
785 # the rest of the file as a message.
786 yield (None, f.read())
788 f.close()
791def _format_message(
792 headers: list[tuple[bytes, bytes]], body: Optional[bytes]
793) -> Iterator[bytes]:
794 for field, value in headers:
795 lines = value.split(b"\n")
796 yield git_line(field, lines[0])
797 for line in lines[1:]:
798 yield b" " + line + b"\n"
799 yield b"\n" # There must be a new line after the headers
800 if body:
801 yield body
804class Tag(ShaFile):
805 """A Git Tag object."""
807 type_name = b"tag"
808 type_num = 4
810 __slots__ = (
811 "_message",
812 "_name",
813 "_object_class",
814 "_object_sha",
815 "_signature",
816 "_tag_time",
817 "_tag_timezone",
818 "_tag_timezone_neg_utc",
819 "_tagger",
820 )
822 _message: Optional[bytes]
823 _name: Optional[bytes]
824 _object_class: Optional[type["ShaFile"]]
825 _object_sha: Optional[bytes]
826 _signature: Optional[bytes]
827 _tag_time: Optional[int]
828 _tag_timezone: Optional[int]
829 _tag_timezone_neg_utc: Optional[bool]
830 _tagger: Optional[bytes]
832 def __init__(self) -> None:
833 super().__init__()
834 self._tagger = None
835 self._tag_time = None
836 self._tag_timezone = None
837 self._tag_timezone_neg_utc = False
838 self._signature: Optional[bytes] = None
840 @classmethod
841 def from_path(cls, filename: Union[str, bytes]) -> "Tag":
842 tag = ShaFile.from_path(filename)
843 if not isinstance(tag, cls):
844 raise NotTagError(_path_to_bytes(filename))
845 return tag
847 def check(self) -> None:
848 """Check this object for internal consistency.
850 Raises:
851 ObjectFormatException: if the object is malformed in some way
852 """
853 super().check()
854 assert self._chunked_text is not None
855 self._check_has_member("_object_sha", "missing object sha")
856 self._check_has_member("_object_class", "missing object type")
857 self._check_has_member("_name", "missing tag name")
859 if not self._name:
860 raise ObjectFormatException("empty tag name")
862 if self._object_sha is None:
863 raise ObjectFormatException("missing object sha")
864 check_hexsha(self._object_sha, "invalid object sha")
866 if self._tagger is not None:
867 check_identity(self._tagger, "invalid tagger")
869 self._check_has_member("_tag_time", "missing tag time")
870 if self._tag_time is None:
871 raise ObjectFormatException("missing tag time")
872 check_time(self._tag_time)
874 last = None
875 for field, _ in _parse_message(self._chunked_text):
876 if field == _OBJECT_HEADER and last is not None:
877 raise ObjectFormatException("unexpected object")
878 elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
879 raise ObjectFormatException("unexpected type")
880 elif field == _TAG_HEADER and last != _TYPE_HEADER:
881 raise ObjectFormatException("unexpected tag name")
882 elif field == _TAGGER_HEADER and last != _TAG_HEADER:
883 raise ObjectFormatException("unexpected tagger")
884 last = field
886 def _serialize(self) -> list[bytes]:
887 headers = []
888 if self._object_sha is None:
889 raise ObjectFormatException("missing object sha")
890 headers.append((_OBJECT_HEADER, self._object_sha))
891 if self._object_class is None:
892 raise ObjectFormatException("missing object class")
893 headers.append((_TYPE_HEADER, self._object_class.type_name))
894 if self._name is None:
895 raise ObjectFormatException("missing tag name")
896 headers.append((_TAG_HEADER, self._name))
897 if self._tagger:
898 if self._tag_time is None:
899 headers.append((_TAGGER_HEADER, self._tagger))
900 else:
901 if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
902 raise ObjectFormatException("missing timezone info")
903 headers.append(
904 (
905 _TAGGER_HEADER,
906 format_time_entry(
907 self._tagger,
908 self._tag_time,
909 (self._tag_timezone, self._tag_timezone_neg_utc),
910 ),
911 )
912 )
914 if self.message is None and self._signature is None:
915 body = None
916 else:
917 body = (self.message or b"") + (self._signature or b"")
918 return list(_format_message(headers, body))
920 def _deserialize(self, chunks: list[bytes]) -> None:
921 """Grab the metadata attached to the tag."""
922 self._tagger = None
923 self._tag_time = None
924 self._tag_timezone = None
925 self._tag_timezone_neg_utc = False
926 for field, value in _parse_message(chunks):
927 if field == _OBJECT_HEADER:
928 self._object_sha = value
929 elif field == _TYPE_HEADER:
930 assert isinstance(value, bytes)
931 obj_class = object_class(value)
932 if not obj_class:
933 raise ObjectFormatException(f"Not a known type: {value!r}")
934 self._object_class = obj_class
935 elif field == _TAG_HEADER:
936 self._name = value
937 elif field == _TAGGER_HEADER:
938 if value is None:
939 raise ObjectFormatException("missing tagger value")
940 (
941 self._tagger,
942 self._tag_time,
943 (self._tag_timezone, self._tag_timezone_neg_utc),
944 ) = parse_time_entry(value)
945 elif field is None:
946 if value is None:
947 self._message = None
948 self._signature = None
949 else:
950 try:
951 sig_idx = value.index(BEGIN_PGP_SIGNATURE)
952 except ValueError:
953 self._message = value
954 self._signature = None
955 else:
956 self._message = value[:sig_idx]
957 self._signature = value[sig_idx:]
958 else:
959 raise ObjectFormatException(
960 f"Unknown field {field.decode('ascii', 'replace')}"
961 )
963 def _get_object(self) -> tuple[type[ShaFile], bytes]:
964 """Get the object pointed to by this tag.
966 Returns: tuple of (object class, sha).
967 """
968 if self._object_class is None or self._object_sha is None:
969 raise ValueError("Tag object is not properly initialized")
970 return (self._object_class, self._object_sha)
972 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
973 (self._object_class, self._object_sha) = value
974 self._needs_serialization = True
976 object = property(_get_object, _set_object)
978 name = serializable_property("name", "The name of this tag")
979 tagger = serializable_property(
980 "tagger", "Returns the name of the person who created this tag"
981 )
982 tag_time = serializable_property(
983 "tag_time",
984 "The creation timestamp of the tag. As the number of seconds since the epoch",
985 )
986 tag_timezone = serializable_property(
987 "tag_timezone", "The timezone that tag_time is in."
988 )
989 message = serializable_property("message", "the message attached to this tag")
991 signature = serializable_property("signature", "Optional detached GPG signature")
993 def sign(self, keyid: Optional[str] = None) -> None:
994 import gpg
996 with gpg.Context(armor=True) as c:
997 if keyid is not None:
998 key = c.get_key(keyid)
999 with gpg.Context(armor=True, signers=[key]) as ctx:
1000 self.signature, unused_result = ctx.sign(
1001 self.as_raw_string(),
1002 mode=gpg.constants.sig.mode.DETACH,
1003 )
1004 else:
1005 self.signature, unused_result = c.sign(
1006 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1007 )
1009 def raw_without_sig(self) -> bytes:
1010 """Return raw string serialization without the GPG/SSH signature.
1012 self.signature is a signature for the returned raw byte string serialization.
1013 """
1014 ret = self.as_raw_string()
1015 if self._signature:
1016 ret = ret[: -len(self._signature)]
1017 return ret
1019 def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
1020 """Verify GPG signature for this tag (if it is signed).
1022 Args:
1023 keyids: Optional iterable of trusted keyids for this tag.
1024 If this tag is not signed by any key in keyids verification will
1025 fail. If not specified, this function only verifies that the tag
1026 has a valid signature.
1028 Raises:
1029 gpg.errors.BadSignatures: if GPG signature verification fails
1030 gpg.errors.MissingSignatures: if tag was not signed by a key
1031 specified in keyids
1032 """
1033 if self._signature is None:
1034 return
1036 import gpg
1038 with gpg.Context() as ctx:
1039 data, result = ctx.verify(
1040 self.raw_without_sig(),
1041 signature=self._signature,
1042 )
1043 if keyids:
1044 keys = [ctx.get_key(key) for key in keyids]
1045 for key in keys:
1046 for subkey in keys:
1047 for sig in result.signatures:
1048 if subkey.can_sign and subkey.fpr == sig.fpr:
1049 return
1050 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1053class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])):
1054 """Named tuple encapsulating a single tree entry."""
1056 def in_path(self, path: bytes) -> "TreeEntry":
1057 """Return a copy of this entry with the given path prepended."""
1058 if not isinstance(self.path, bytes):
1059 raise TypeError(f"Expected bytes for path, got {path!r}")
1060 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
1063def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]:
1064 """Parse a tree text.
1066 Args:
1067 text: Serialized text to parse
1068 Returns: iterator of tuples of (name, mode, sha)
1070 Raises:
1071 ObjectFormatException: if the object was malformed in some way
1072 """
1073 count = 0
1074 length = len(text)
1075 while count < length:
1076 mode_end = text.index(b" ", count)
1077 mode_text = text[count:mode_end]
1078 if strict and mode_text.startswith(b"0"):
1079 raise ObjectFormatException(f"Invalid mode {mode_text!r}")
1080 try:
1081 mode = int(mode_text, 8)
1082 except ValueError as exc:
1083 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
1084 name_end = text.index(b"\0", mode_end)
1085 name = text[mode_end + 1 : name_end]
1086 count = name_end + 21
1087 sha = text[name_end + 1 : count]
1088 if len(sha) != 20:
1089 raise ObjectFormatException("Sha has invalid length")
1090 hexsha = sha_to_hex(sha)
1091 yield (name, mode, hexsha)
1094def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]:
1095 """Serialize the items in a tree to a text.
1097 Args:
1098 items: Sorted iterable over (name, mode, sha) tuples
1099 Returns: Serialized tree text as chunks
1100 """
1101 for name, mode, hexsha in items:
1102 yield (
1103 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha)
1104 )
1107def sorted_tree_items(
1108 entries: dict[bytes, tuple[int, bytes]], name_order: bool
1109) -> Iterator[TreeEntry]:
1110 """Iterate over a tree entries dictionary.
1112 Args:
1113 name_order: If True, iterate entries in order of their name. If
1114 False, iterate entries in tree order, that is, treat subtree entries as
1115 having '/' appended.
1116 entries: Dictionary mapping names to (mode, sha) tuples
1117 Returns: Iterator over (name, mode, hexsha)
1118 """
1119 if name_order:
1120 key_func = key_entry_name_order
1121 else:
1122 key_func = key_entry
1123 for name, entry in sorted(entries.items(), key=key_func):
1124 mode, hexsha = entry
1125 # Stricter type checks than normal to mirror checks in the Rust version.
1126 mode = int(mode)
1127 if not isinstance(hexsha, bytes):
1128 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}")
1129 yield TreeEntry(name, mode, hexsha)
1132def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1133 """Sort key for tree entry.
1135 Args:
1136 entry: (name, value) tuple
1137 """
1138 (name, (mode, _sha)) = entry
1139 if stat.S_ISDIR(mode):
1140 name += b"/"
1141 return name
1144def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1145 """Sort key for tree entry in name order."""
1146 return entry[0]
1149def pretty_format_tree_entry(
1150 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8"
1151) -> str:
1152 """Pretty format tree entry.
1154 Args:
1155 name: Name of the directory entry
1156 mode: Mode of entry
1157 hexsha: Hexsha of the referenced object
1158 Returns: string describing the tree entry
1159 """
1160 if mode & stat.S_IFDIR:
1161 kind = "tree"
1162 else:
1163 kind = "blob"
1164 return "{:04o} {} {}\t{}\n".format(
1165 mode,
1166 kind,
1167 hexsha.decode("ascii"),
1168 name.decode(encoding, "replace"),
1169 )
1172class SubmoduleEncountered(Exception):
1173 """A submodule was encountered while resolving a path."""
1175 def __init__(self, path: bytes, sha: ObjectID) -> None:
1176 self.path = path
1177 self.sha = sha
1180class Tree(ShaFile):
1181 """A Git tree object."""
1183 type_name = b"tree"
1184 type_num = 2
1186 __slots__ = "_entries"
1188 def __init__(self) -> None:
1189 super().__init__()
1190 self._entries: dict[bytes, tuple[int, bytes]] = {}
1192 @classmethod
1193 def from_path(cls, filename: Union[str, bytes]) -> "Tree":
1194 tree = ShaFile.from_path(filename)
1195 if not isinstance(tree, cls):
1196 raise NotTreeError(_path_to_bytes(filename))
1197 return tree
1199 def __contains__(self, name: bytes) -> bool:
1200 return name in self._entries
1202 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
1203 return self._entries[name]
1205 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
1206 """Set a tree entry by name.
1208 Args:
1209 name: The name of the entry, as a string.
1210 value: A tuple of (mode, hexsha), where mode is the mode of the
1211 entry as an integral type and hexsha is the hex SHA of the entry as
1212 a string.
1213 """
1214 mode, hexsha = value
1215 self._entries[name] = (mode, hexsha)
1216 self._needs_serialization = True
1218 def __delitem__(self, name: bytes) -> None:
1219 del self._entries[name]
1220 self._needs_serialization = True
1222 def __len__(self) -> int:
1223 return len(self._entries)
1225 def __iter__(self) -> Iterator[bytes]:
1226 return iter(self._entries)
1228 def add(self, name: bytes, mode: int, hexsha: bytes) -> None:
1229 """Add an entry to the tree.
1231 Args:
1232 mode: The mode of the entry as an integral type. Not all
1233 possible modes are supported by git; see check() for details.
1234 name: The name of the entry, as a string.
1235 hexsha: The hex SHA of the entry as a string.
1236 """
1237 self._entries[name] = mode, hexsha
1238 self._needs_serialization = True
1240 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
1241 """Iterate over entries.
1243 Args:
1244 name_order: If True, iterate in name order instead of tree
1245 order.
1246 Returns: Iterator over (name, mode, sha) tuples
1247 """
1248 return sorted_tree_items(self._entries, name_order)
1250 def items(self) -> list[TreeEntry]:
1251 """Return the sorted entries in this tree.
1253 Returns: List with (name, mode, sha) tuples
1254 """
1255 return list(self.iteritems())
1257 def _deserialize(self, chunks: list[bytes]) -> None:
1258 """Grab the entries in the tree."""
1259 try:
1260 parsed_entries = parse_tree(b"".join(chunks))
1261 except ValueError as exc:
1262 raise ObjectFormatException(exc) from exc
1263 # TODO: list comprehension is for efficiency in the common (small)
1264 # case; if memory efficiency in the large case is a concern, use a
1265 # genexp.
1266 self._entries = {n: (m, s) for n, m, s in parsed_entries}
1268 def check(self) -> None:
1269 """Check this object for internal consistency.
1271 Raises:
1272 ObjectFormatException: if the object is malformed in some way
1273 """
1274 super().check()
1275 assert self._chunked_text is not None
1276 last = None
1277 allowed_modes = (
1278 stat.S_IFREG | 0o755,
1279 stat.S_IFREG | 0o644,
1280 stat.S_IFLNK,
1281 stat.S_IFDIR,
1282 S_IFGITLINK,
1283 # TODO: optionally exclude as in git fsck --strict
1284 stat.S_IFREG | 0o664,
1285 )
1286 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True):
1287 check_hexsha(sha, f"invalid sha {sha!r}")
1288 if b"/" in name or name in (b"", b".", b"..", b".git"):
1289 raise ObjectFormatException(
1290 "invalid name {}".format(name.decode("utf-8", "replace"))
1291 )
1293 if mode not in allowed_modes:
1294 raise ObjectFormatException(f"invalid mode {mode:06o}")
1296 entry = (name, (mode, sha))
1297 if last:
1298 if key_entry(last) > key_entry(entry):
1299 raise ObjectFormatException("entries not sorted")
1300 if name == last[0]:
1301 raise ObjectFormatException(f"duplicate entry {name!r}")
1302 last = entry
1304 def _serialize(self) -> list[bytes]:
1305 return list(serialize_tree(self.iteritems()))
1307 def as_pretty_string(self) -> str:
1308 text: list[str] = []
1309 for name, mode, hexsha in self.iteritems():
1310 text.append(pretty_format_tree_entry(name, mode, hexsha))
1311 return "".join(text)
1313 def lookup_path(
1314 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
1315 ) -> tuple[int, ObjectID]:
1316 """Look up an object in a Git tree.
1318 Args:
1319 lookup_obj: Callback for retrieving object by SHA1
1320 path: Path to lookup
1321 Returns: A tuple of (mode, SHA) of the resulting path.
1322 """
1323 # Handle empty path - return the tree itself
1324 if not path:
1325 return stat.S_IFDIR, self.id
1327 parts = path.split(b"/")
1328 sha = self.id
1329 mode: Optional[int] = None
1330 for i, p in enumerate(parts):
1331 if not p:
1332 continue
1333 if mode is not None and S_ISGITLINK(mode):
1334 raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
1335 obj = lookup_obj(sha)
1336 if not isinstance(obj, Tree):
1337 raise NotTreeError(sha)
1338 mode, sha = obj[p]
1339 if mode is None:
1340 raise ValueError("No valid path found")
1341 return mode, sha
1344def parse_timezone(text: bytes) -> tuple[int, bool]:
1345 """Parse a timezone text fragment (e.g. '+0100').
1347 Args:
1348 text: Text to parse.
1349 Returns: Tuple with timezone as seconds difference to UTC
1350 and a boolean indicating whether this was a UTC timezone
1351 prefixed with a negative sign (-0000).
1352 """
1353 # cgit parses the first character as the sign, and the rest
1354 # as an integer (using strtol), which could also be negative.
1355 # We do the same for compatibility. See #697828.
1356 if text[0] not in b"+-":
1357 raise ValueError("Timezone must start with + or - ({text})".format(**vars()))
1358 sign = text[:1]
1359 offset = int(text[1:])
1360 if sign == b"-":
1361 offset = -offset
1362 unnecessary_negative_timezone = offset >= 0 and sign == b"-"
1363 signum = ((offset < 0) and -1) or 1
1364 offset = abs(offset)
1365 hours = int(offset / 100)
1366 minutes = offset % 100
1367 return (
1368 signum * (hours * 3600 + minutes * 60),
1369 unnecessary_negative_timezone,
1370 )
1373def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
1374 """Format a timezone for Git serialization.
1376 Args:
1377 offset: Timezone offset as seconds difference to UTC
1378 unnecessary_negative_timezone: Whether to use a minus sign for
1379 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700).
1380 """
1381 if offset % 60 != 0:
1382 raise ValueError("Unable to handle non-minute offset.")
1383 if offset < 0 or unnecessary_negative_timezone:
1384 sign = "-"
1385 offset = -offset
1386 else:
1387 sign = "+"
1388 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
1391def parse_time_entry(
1392 value: bytes,
1393) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]:
1394 """Parse event.
1396 Args:
1397 value: Bytes representing a git commit/tag line
1398 Raises:
1399 ObjectFormatException in case of parsing error (malformed
1400 field date)
1401 Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
1402 """
1403 try:
1404 sep = value.rindex(b"> ")
1405 except ValueError:
1406 return (value, None, (None, False))
1407 try:
1408 person = value[0 : sep + 1]
1409 rest = value[sep + 2 :]
1410 timetext, timezonetext = rest.rsplit(b" ", 1)
1411 time = int(timetext)
1412 timezone, timezone_neg_utc = parse_timezone(timezonetext)
1413 except ValueError as exc:
1414 raise ObjectFormatException(exc) from exc
1415 return person, time, (timezone, timezone_neg_utc)
1418def format_time_entry(
1419 person: bytes, time: int, timezone_info: tuple[int, bool]
1420) -> bytes:
1421 """Format an event."""
1422 (timezone, timezone_neg_utc) = timezone_info
1423 return b" ".join(
1424 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
1425 )
1428@replace_me(since="0.21.0", remove_in="0.24.0")
1429def parse_commit(
1430 chunks: Iterable[bytes],
1431) -> tuple[
1432 Optional[bytes],
1433 list[bytes],
1434 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
1435 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
1436 Optional[bytes],
1437 list[Tag],
1438 Optional[bytes],
1439 Optional[bytes],
1440 list[tuple[bytes, bytes]],
1441]:
1442 """Parse a commit object from chunks.
1444 Args:
1445 chunks: Chunks to parse
1446 Returns: Tuple of (tree, parents, author_info, commit_info,
1447 encoding, mergetag, gpgsig, message, extra)
1448 """
1449 parents = []
1450 extra = []
1451 tree = None
1452 author_info: tuple[
1453 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1454 ] = (None, None, (None, None))
1455 commit_info: tuple[
1456 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1457 ] = (None, None, (None, None))
1458 encoding = None
1459 mergetag = []
1460 message = None
1461 gpgsig = None
1463 for field, value in _parse_message(chunks):
1464 # TODO(jelmer): Enforce ordering
1465 if field == _TREE_HEADER:
1466 tree = value
1467 elif field == _PARENT_HEADER:
1468 if value is None:
1469 raise ObjectFormatException("missing parent value")
1470 parents.append(value)
1471 elif field == _AUTHOR_HEADER:
1472 if value is None:
1473 raise ObjectFormatException("missing author value")
1474 author_info = parse_time_entry(value)
1475 elif field == _COMMITTER_HEADER:
1476 if value is None:
1477 raise ObjectFormatException("missing committer value")
1478 commit_info = parse_time_entry(value)
1479 elif field == _ENCODING_HEADER:
1480 encoding = value
1481 elif field == _MERGETAG_HEADER:
1482 if value is None:
1483 raise ObjectFormatException("missing mergetag value")
1484 tag = Tag.from_string(value + b"\n")
1485 assert isinstance(tag, Tag)
1486 mergetag.append(tag)
1487 elif field == _GPGSIG_HEADER:
1488 gpgsig = value
1489 elif field is None:
1490 message = value
1491 else:
1492 if value is None:
1493 raise ObjectFormatException(f"missing value for field {field!r}")
1494 extra.append((field, value))
1495 return (
1496 tree,
1497 parents,
1498 author_info,
1499 commit_info,
1500 encoding,
1501 mergetag,
1502 gpgsig,
1503 message,
1504 extra,
1505 )
1508class Commit(ShaFile):
1509 """A git commit object."""
1511 type_name = b"commit"
1512 type_num = 1
1514 __slots__ = (
1515 "_author",
1516 "_author_time",
1517 "_author_timezone",
1518 "_author_timezone_neg_utc",
1519 "_commit_time",
1520 "_commit_timezone",
1521 "_commit_timezone_neg_utc",
1522 "_committer",
1523 "_encoding",
1524 "_extra",
1525 "_gpgsig",
1526 "_mergetag",
1527 "_message",
1528 "_parents",
1529 "_tree",
1530 )
1532 def __init__(self) -> None:
1533 super().__init__()
1534 self._parents: list[bytes] = []
1535 self._encoding: Optional[bytes] = None
1536 self._mergetag: list[Tag] = []
1537 self._gpgsig: Optional[bytes] = None
1538 self._extra: list[tuple[bytes, Optional[bytes]]] = []
1539 self._author_timezone_neg_utc: Optional[bool] = False
1540 self._commit_timezone_neg_utc: Optional[bool] = False
1542 @classmethod
1543 def from_path(cls, path: Union[str, bytes]) -> "Commit":
1544 commit = ShaFile.from_path(path)
1545 if not isinstance(commit, cls):
1546 raise NotCommitError(_path_to_bytes(path))
1547 return commit
1549 def _deserialize(self, chunks: list[bytes]) -> None:
1550 self._parents = []
1551 self._extra = []
1552 self._tree = None
1553 author_info: tuple[
1554 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1555 ] = (None, None, (None, None))
1556 commit_info: tuple[
1557 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1558 ] = (None, None, (None, None))
1559 self._encoding = None
1560 self._mergetag = []
1561 self._message = None
1562 self._gpgsig = None
1564 for field, value in _parse_message(chunks):
1565 # TODO(jelmer): Enforce ordering
1566 if field == _TREE_HEADER:
1567 self._tree = value
1568 elif field == _PARENT_HEADER:
1569 assert value is not None
1570 self._parents.append(value)
1571 elif field == _AUTHOR_HEADER:
1572 if value is None:
1573 raise ObjectFormatException("missing author value")
1574 author_info = parse_time_entry(value)
1575 elif field == _COMMITTER_HEADER:
1576 if value is None:
1577 raise ObjectFormatException("missing committer value")
1578 commit_info = parse_time_entry(value)
1579 elif field == _ENCODING_HEADER:
1580 self._encoding = value
1581 elif field == _MERGETAG_HEADER:
1582 assert value is not None
1583 tag = Tag.from_string(value + b"\n")
1584 assert isinstance(tag, Tag)
1585 self._mergetag.append(tag)
1586 elif field == _GPGSIG_HEADER:
1587 self._gpgsig = value
1588 elif field is None:
1589 self._message = value
1590 else:
1591 self._extra.append((field, value))
1593 (
1594 self._author,
1595 self._author_time,
1596 (self._author_timezone, self._author_timezone_neg_utc),
1597 ) = author_info
1598 (
1599 self._committer,
1600 self._commit_time,
1601 (self._commit_timezone, self._commit_timezone_neg_utc),
1602 ) = commit_info
1604 def check(self) -> None:
1605 """Check this object for internal consistency.
1607 Raises:
1608 ObjectFormatException: if the object is malformed in some way
1609 """
1610 super().check()
1611 assert self._chunked_text is not None
1612 self._check_has_member("_tree", "missing tree")
1613 self._check_has_member("_author", "missing author")
1614 self._check_has_member("_committer", "missing committer")
1615 self._check_has_member("_author_time", "missing author time")
1616 self._check_has_member("_commit_time", "missing commit time")
1618 for parent in self._parents:
1619 check_hexsha(parent, "invalid parent sha")
1620 assert self._tree is not None # checked by _check_has_member above
1621 check_hexsha(self._tree, "invalid tree sha")
1623 assert self._author is not None # checked by _check_has_member above
1624 assert self._committer is not None # checked by _check_has_member above
1625 check_identity(self._author, "invalid author")
1626 check_identity(self._committer, "invalid committer")
1628 assert self._author_time is not None # checked by _check_has_member above
1629 assert self._commit_time is not None # checked by _check_has_member above
1630 check_time(self._author_time)
1631 check_time(self._commit_time)
1633 last = None
1634 for field, _ in _parse_message(self._chunked_text):
1635 if field == _TREE_HEADER and last is not None:
1636 raise ObjectFormatException("unexpected tree")
1637 elif field == _PARENT_HEADER and last not in (
1638 _PARENT_HEADER,
1639 _TREE_HEADER,
1640 ):
1641 raise ObjectFormatException("unexpected parent")
1642 elif field == _AUTHOR_HEADER and last not in (
1643 _TREE_HEADER,
1644 _PARENT_HEADER,
1645 ):
1646 raise ObjectFormatException("unexpected author")
1647 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER:
1648 raise ObjectFormatException("unexpected committer")
1649 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER:
1650 raise ObjectFormatException("unexpected encoding")
1651 last = field
1653 # TODO: optionally check for duplicate parents
1655 def sign(self, keyid: Optional[str] = None) -> None:
1656 import gpg
1658 with gpg.Context(armor=True) as c:
1659 if keyid is not None:
1660 key = c.get_key(keyid)
1661 with gpg.Context(armor=True, signers=[key]) as ctx:
1662 self.gpgsig, unused_result = ctx.sign(
1663 self.as_raw_string(),
1664 mode=gpg.constants.sig.mode.DETACH,
1665 )
1666 else:
1667 self.gpgsig, unused_result = c.sign(
1668 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1669 )
1671 def raw_without_sig(self) -> bytes:
1672 """Return raw string serialization without the GPG/SSH signature.
1674 self.gpgsig is a signature for the returned raw byte string serialization.
1675 """
1676 tmp = self.copy()
1677 assert isinstance(tmp, Commit)
1678 tmp._gpgsig = None
1679 tmp.gpgsig = None
1680 return tmp.as_raw_string()
1682 def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
1683 """Verify GPG signature for this commit (if it is signed).
1685 Args:
1686 keyids: Optional iterable of trusted keyids for this commit.
1687 If this commit is not signed by any key in keyids verification will
1688 fail. If not specified, this function only verifies that the commit
1689 has a valid signature.
1691 Raises:
1692 gpg.errors.BadSignatures: if GPG signature verification fails
1693 gpg.errors.MissingSignatures: if commit was not signed by a key
1694 specified in keyids
1695 """
1696 if self._gpgsig is None:
1697 return
1699 import gpg
1701 with gpg.Context() as ctx:
1702 data, result = ctx.verify(
1703 self.raw_without_sig(),
1704 signature=self._gpgsig,
1705 )
1706 if keyids:
1707 keys = [ctx.get_key(key) for key in keyids]
1708 for key in keys:
1709 for subkey in keys:
1710 for sig in result.signatures:
1711 if subkey.can_sign and subkey.fpr == sig.fpr:
1712 return
1713 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1715 def _serialize(self) -> list[bytes]:
1716 headers = []
1717 assert self._tree is not None
1718 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
1719 headers.append((_TREE_HEADER, tree_bytes))
1720 for p in self._parents:
1721 headers.append((_PARENT_HEADER, p))
1722 assert self._author is not None
1723 assert self._author_time is not None
1724 assert self._author_timezone is not None
1725 assert self._author_timezone_neg_utc is not None
1726 headers.append(
1727 (
1728 _AUTHOR_HEADER,
1729 format_time_entry(
1730 self._author,
1731 self._author_time,
1732 (self._author_timezone, self._author_timezone_neg_utc),
1733 ),
1734 )
1735 )
1736 assert self._committer is not None
1737 assert self._commit_time is not None
1738 assert self._commit_timezone is not None
1739 assert self._commit_timezone_neg_utc is not None
1740 headers.append(
1741 (
1742 _COMMITTER_HEADER,
1743 format_time_entry(
1744 self._committer,
1745 self._commit_time,
1746 (self._commit_timezone, self._commit_timezone_neg_utc),
1747 ),
1748 )
1749 )
1750 if self.encoding:
1751 headers.append((_ENCODING_HEADER, self.encoding))
1752 for mergetag in self.mergetag:
1753 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
1754 headers.extend(
1755 (field, value) for field, value in self._extra if value is not None
1756 )
1757 if self.gpgsig:
1758 headers.append((_GPGSIG_HEADER, self.gpgsig))
1759 return list(_format_message(headers, self._message))
1761 tree = serializable_property("tree", "Tree that is the state of this commit")
1763 def _get_parents(self) -> list[bytes]:
1764 """Return a list of parents of this commit."""
1765 return self._parents
1767 def _set_parents(self, value: list[bytes]) -> None:
1768 """Set a list of parents of this commit."""
1769 self._needs_serialization = True
1770 self._parents = value
1772 parents = property(
1773 _get_parents,
1774 _set_parents,
1775 doc="Parents of this commit, by their SHA1.",
1776 )
1778 @replace_me(since="0.21.0", remove_in="0.24.0")
1779 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]:
1780 """Return extra settings of this commit."""
1781 return self._extra
1783 extra = property(
1784 _get_extra,
1785 doc="Extra header fields not understood (presumably added in a "
1786 "newer version of git). Kept verbatim so the object can "
1787 "be correctly reserialized. For private commit metadata, use "
1788 "pseudo-headers in Commit.message, rather than this field.",
1789 )
1791 author = serializable_property("author", "The name of the author of the commit")
1793 committer = serializable_property(
1794 "committer", "The name of the committer of the commit"
1795 )
1797 message = serializable_property("message", "The commit message")
1799 commit_time = serializable_property(
1800 "commit_time",
1801 "The timestamp of the commit. As the number of seconds since the epoch.",
1802 )
1804 commit_timezone = serializable_property(
1805 "commit_timezone", "The zone the commit time is in"
1806 )
1808 author_time = serializable_property(
1809 "author_time",
1810 "The timestamp the commit was written. As the number of "
1811 "seconds since the epoch.",
1812 )
1814 author_timezone = serializable_property(
1815 "author_timezone", "Returns the zone the author time is in."
1816 )
1818 encoding = serializable_property("encoding", "Encoding of the commit message.")
1820 mergetag = serializable_property("mergetag", "Associated signed tag.")
1822 gpgsig = serializable_property("gpgsig", "GPG Signature.")
1825OBJECT_CLASSES = (
1826 Commit,
1827 Tree,
1828 Blob,
1829 Tag,
1830)
1832_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {}
1834for cls in OBJECT_CLASSES:
1835 _TYPE_MAP[cls.type_name] = cls
1836 _TYPE_MAP[cls.type_num] = cls
1839# Hold on to the pure-python implementations for testing
1840_parse_tree_py = parse_tree
1841_sorted_tree_items_py = sorted_tree_items
1842try:
1843 # Try to import Rust versions
1844 from dulwich._objects import (
1845 parse_tree as _parse_tree_rs,
1846 )
1847 from dulwich._objects import (
1848 sorted_tree_items as _sorted_tree_items_rs,
1849 )
1850except ImportError:
1851 pass
1852else:
1853 parse_tree = _parse_tree_rs
1854 sorted_tree_items = _sorted_tree_items_rs