Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# objects.py -- Access to base git objects
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as public by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Access to base git objects."""
25import binascii
26import os
27import posixpath
28import stat
29import zlib
30from collections import namedtuple
31from collections.abc import Callable, Iterable, Iterator
32from hashlib import sha1
33from io import BufferedIOBase, BytesIO
34from typing import (
35 IO,
36 TYPE_CHECKING,
37 Optional,
38 Union,
39)
41try:
42 from typing import TypeGuard # type: ignore
43except ImportError:
44 from typing_extensions import TypeGuard
46from . import replace_me
47from .errors import (
48 ChecksumMismatch,
49 FileFormatException,
50 NotBlobError,
51 NotCommitError,
52 NotTagError,
53 NotTreeError,
54 ObjectFormatException,
55)
56from .file import GitFile
58if TYPE_CHECKING:
59 from _hashlib import HASH
61 from .file import _GitFile
63ZERO_SHA = b"0" * 40
65# Header fields for commits
66_TREE_HEADER = b"tree"
67_PARENT_HEADER = b"parent"
68_AUTHOR_HEADER = b"author"
69_COMMITTER_HEADER = b"committer"
70_ENCODING_HEADER = b"encoding"
71_MERGETAG_HEADER = b"mergetag"
72_GPGSIG_HEADER = b"gpgsig"
74# Header fields for objects
75_OBJECT_HEADER = b"object"
76_TYPE_HEADER = b"type"
77_TAG_HEADER = b"tag"
78_TAGGER_HEADER = b"tagger"
81S_IFGITLINK = 0o160000
84MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
86BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
89ObjectID = bytes
92class EmptyFileException(FileFormatException):
93 """An unexpectedly empty file was encountered."""
96def S_ISGITLINK(m: int) -> bool:
97 """Check if a mode indicates a submodule.
99 Args:
100 m: Mode to check
101 Returns: a ``boolean``
102 """
103 return stat.S_IFMT(m) == S_IFGITLINK
106def _decompress(string: bytes) -> bytes:
107 dcomp = zlib.decompressobj()
108 dcomped = dcomp.decompress(string)
109 dcomped += dcomp.flush()
110 return dcomped
113def sha_to_hex(sha: ObjectID) -> bytes:
114 """Takes a string and returns the hex of the sha within."""
115 hexsha = binascii.hexlify(sha)
116 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}"
117 return hexsha
120def hex_to_sha(hex: Union[bytes, str]) -> bytes:
121 """Takes a hex sha and returns a binary sha."""
122 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}"
123 try:
124 return binascii.unhexlify(hex)
125 except TypeError as exc:
126 if not isinstance(hex, bytes):
127 raise
128 raise ValueError(exc.args[0]) from exc
131def valid_hexsha(hex: Union[bytes, str]) -> bool:
132 if len(hex) != 40:
133 return False
134 try:
135 binascii.unhexlify(hex)
136 except (TypeError, binascii.Error):
137 return False
138 else:
139 return True
142def hex_to_filename(
143 path: Union[str, bytes], hex: Union[str, bytes]
144) -> Union[str, bytes]:
145 """Takes a hex sha and returns its filename relative to the given path."""
146 # os.path.join accepts bytes or unicode, but all args must be of the same
147 # type. Make sure that hex which is expected to be bytes, is the same type
148 # as path.
149 if type(path) is not type(hex) and getattr(path, "encode", None) is not None:
150 hex = hex.decode("ascii") # type: ignore
151 dir = hex[:2]
152 file = hex[2:]
153 # Check from object dir
154 return os.path.join(path, dir, file) # type: ignore
157def filename_to_hex(filename: Union[str, bytes]) -> str:
158 """Takes an object filename and returns its corresponding hex sha."""
159 # grab the last (up to) two path components
160 names = filename.rsplit(os.path.sep, 2)[-2:] # type: ignore
161 errmsg = f"Invalid object filename: {filename!r}"
162 assert len(names) == 2, errmsg
163 base, rest = names
164 assert len(base) == 2 and len(rest) == 38, errmsg
165 hex_bytes = (base + rest).encode("ascii") # type: ignore
166 hex_to_sha(hex_bytes)
167 return hex_bytes.decode("ascii")
170def object_header(num_type: int, length: int) -> bytes:
171 """Return an object header for the given numeric type and text length."""
172 cls = object_class(num_type)
173 if cls is None:
174 raise AssertionError(f"unsupported class type num: {num_type}")
175 return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
178def serializable_property(name: str, docstring: Optional[str] = None) -> property:
179 """A property that helps tracking whether serialization is necessary."""
181 def set(obj: "ShaFile", value: object) -> None:
182 setattr(obj, "_" + name, value)
183 obj._needs_serialization = True
185 def get(obj: "ShaFile") -> object:
186 return getattr(obj, "_" + name)
188 return property(get, set, doc=docstring)
191def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]:
192 """Get the object class corresponding to the given type.
194 Args:
195 type: Either a type name string or a numeric type.
196 Returns: The ShaFile subclass corresponding to the given type, or None if
197 type is not a valid type name/number.
198 """
199 return _TYPE_MAP.get(type, None)
202def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None:
203 """Check if a string is a valid hex sha string.
205 Args:
206 hex: Hex string to check
207 error_msg: Error message to use in exception
208 Raises:
209 ObjectFormatException: Raised when the string is not valid
210 """
211 if not valid_hexsha(hex):
212 raise ObjectFormatException(f"{error_msg} {hex!r}")
215def check_identity(identity: Optional[bytes], error_msg: str) -> None:
216 """Check if the specified identity is valid.
218 This will raise an exception if the identity is not valid.
220 Args:
221 identity: Identity string
222 error_msg: Error message to use in exception
223 """
224 if identity is None:
225 raise ObjectFormatException(error_msg)
226 email_start = identity.find(b"<")
227 email_end = identity.find(b">")
228 if not all(
229 [
230 email_start >= 1,
231 identity[email_start - 1] == b" "[0],
232 identity.find(b"<", email_start + 1) == -1,
233 email_end == len(identity) - 1,
234 b"\0" not in identity,
235 b"\n" not in identity,
236 ]
237 ):
238 raise ObjectFormatException(error_msg)
241def check_time(time_seconds: int) -> None:
242 """Check if the specified time is not prone to overflow error.
244 This will raise an exception if the time is not valid.
246 Args:
247 time_seconds: time in seconds
249 """
250 # Prevent overflow error
251 if time_seconds > MAX_TIME:
252 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
255def git_line(*items: bytes) -> bytes:
256 """Formats items into a space separated line."""
257 return b" ".join(items) + b"\n"
260class FixedSha:
261 """SHA object that behaves like hashlib's but is given a fixed value."""
263 __slots__ = ("_hexsha", "_sha")
265 def __init__(self, hexsha: Union[str, bytes]) -> None:
266 if getattr(hexsha, "encode", None) is not None:
267 hexsha = hexsha.encode("ascii") # type: ignore
268 if not isinstance(hexsha, bytes):
269 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
270 self._hexsha = hexsha
271 self._sha = hex_to_sha(hexsha)
273 def digest(self) -> bytes:
274 """Return the raw SHA digest."""
275 return self._sha
277 def hexdigest(self) -> str:
278 """Return the hex SHA digest."""
279 return self._hexsha.decode("ascii")
282# Type guard functions for runtime type narrowing
283if TYPE_CHECKING:
285 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
286 """Check if a ShaFile is a Commit."""
287 return obj.type_name == b"commit"
289 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
290 """Check if a ShaFile is a Tree."""
291 return obj.type_name == b"tree"
293 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
294 """Check if a ShaFile is a Blob."""
295 return obj.type_name == b"blob"
297 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
298 """Check if a ShaFile is a Tag."""
299 return obj.type_name == b"tag"
300else:
301 # Runtime versions without type narrowing
302 def is_commit(obj: "ShaFile") -> bool:
303 """Check if a ShaFile is a Commit."""
304 return obj.type_name == b"commit"
306 def is_tree(obj: "ShaFile") -> bool:
307 """Check if a ShaFile is a Tree."""
308 return obj.type_name == b"tree"
310 def is_blob(obj: "ShaFile") -> bool:
311 """Check if a ShaFile is a Blob."""
312 return obj.type_name == b"blob"
314 def is_tag(obj: "ShaFile") -> bool:
315 """Check if a ShaFile is a Tag."""
316 return obj.type_name == b"tag"
319class ShaFile:
320 """A git SHA file."""
322 __slots__ = ("_chunked_text", "_needs_serialization", "_sha")
324 _needs_serialization: bool
325 type_name: bytes
326 type_num: int
327 _chunked_text: Optional[list[bytes]]
328 _sha: Union[FixedSha, None, "HASH"]
330 @staticmethod
331 def _parse_legacy_object_header(
332 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
333 ) -> "ShaFile":
334 """Parse a legacy object, creating it but not reading the file."""
335 bufsize = 1024
336 decomp = zlib.decompressobj()
337 header = decomp.decompress(magic)
338 start = 0
339 end = -1
340 while end < 0:
341 extra = f.read(bufsize)
342 header += decomp.decompress(extra)
343 magic += extra
344 end = header.find(b"\0", start)
345 start = len(header)
346 header = header[:end]
347 type_name, size = header.split(b" ", 1)
348 try:
349 int(size) # sanity check
350 except ValueError as exc:
351 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc
352 obj_class = object_class(type_name)
353 if not obj_class:
354 raise ObjectFormatException(
355 "Not a known type: {}".format(type_name.decode("ascii"))
356 )
357 return obj_class()
359 def _parse_legacy_object(self, map: bytes) -> None:
360 """Parse a legacy object, setting the raw string."""
361 text = _decompress(map)
362 header_end = text.find(b"\0")
363 if header_end < 0:
364 raise ObjectFormatException("Invalid object header, no \\0")
365 self.set_raw_string(text[header_end + 1 :])
367 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
368 """Return chunks representing the object in the experimental format.
370 Returns: List of strings
371 """
372 compobj = zlib.compressobj(compression_level)
373 yield compobj.compress(self._header())
374 for chunk in self.as_raw_chunks():
375 yield compobj.compress(chunk)
376 yield compobj.flush()
378 def as_legacy_object(self, compression_level: int = -1) -> bytes:
379 """Return string representing the object in the experimental format."""
380 return b"".join(
381 self.as_legacy_object_chunks(compression_level=compression_level)
382 )
384 def as_raw_chunks(self) -> list[bytes]:
385 """Return chunks with serialization of the object.
387 Returns: List of strings, not necessarily one per line
388 """
389 if self._needs_serialization:
390 self._sha = None
391 self._chunked_text = self._serialize()
392 self._needs_serialization = False
393 return self._chunked_text # type: ignore
395 def as_raw_string(self) -> bytes:
396 """Return raw string with serialization of the object.
398 Returns: String object
399 """
400 return b"".join(self.as_raw_chunks())
402 def __bytes__(self) -> bytes:
403 """Return raw string serialization of this object."""
404 return self.as_raw_string()
406 def __hash__(self) -> int:
407 """Return unique hash for this object."""
408 return hash(self.id)
410 def as_pretty_string(self) -> str:
411 """Return a string representing this object, fit for display."""
412 return self.as_raw_string().decode("utf-8", "replace")
414 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None:
415 """Set the contents of this object from a serialized string."""
416 if not isinstance(text, bytes):
417 raise TypeError(f"Expected bytes for text, got {text!r}")
418 self.set_raw_chunks([text], sha)
420 def set_raw_chunks(
421 self, chunks: list[bytes], sha: Optional[ObjectID] = None
422 ) -> None:
423 """Set the contents of this object from a list of chunks."""
424 self._chunked_text = chunks
425 self._deserialize(chunks)
426 if sha is None:
427 self._sha = None
428 else:
429 self._sha = FixedSha(sha) # type: ignore
430 self._needs_serialization = False
432 @staticmethod
433 def _parse_object_header(
434 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
435 ) -> "ShaFile":
436 """Parse a new style object, creating it but not reading the file."""
437 num_type = (ord(magic[0:1]) >> 4) & 7
438 obj_class = object_class(num_type)
439 if not obj_class:
440 raise ObjectFormatException(f"Not a known type {num_type}")
441 return obj_class()
443 def _parse_object(self, map: bytes) -> None:
444 """Parse a new style object, setting self._text."""
445 # skip type and size; type must have already been determined, and
446 # we trust zlib to fail if it's otherwise corrupted
447 byte = ord(map[0:1])
448 used = 1
449 while (byte & 0x80) != 0:
450 byte = ord(map[used : used + 1])
451 used += 1
452 raw = map[used:]
453 self.set_raw_string(_decompress(raw))
455 @classmethod
456 def _is_legacy_object(cls, magic: bytes) -> bool:
457 b0 = ord(magic[0:1])
458 b1 = ord(magic[1:2])
459 word = (b0 << 8) + b1
460 return (b0 & 0x8F) == 0x08 and (word % 31) == 0
462 @classmethod
463 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
464 map = f.read()
465 if not map:
466 raise EmptyFileException("Corrupted empty file detected")
468 if cls._is_legacy_object(map):
469 obj = cls._parse_legacy_object_header(map, f)
470 obj._parse_legacy_object(map)
471 else:
472 obj = cls._parse_object_header(map, f)
473 obj._parse_object(map)
474 return obj
476 def __init__(self) -> None:
477 """Don't call this directly."""
478 self._sha = None
479 self._chunked_text = []
480 self._needs_serialization = True
482 def _deserialize(self, chunks: list[bytes]) -> None:
483 raise NotImplementedError(self._deserialize)
485 def _serialize(self) -> list[bytes]:
486 raise NotImplementedError(self._serialize)
488 @classmethod
489 def from_path(cls, path: Union[str, bytes]) -> "ShaFile":
490 """Open a SHA file from disk."""
491 with GitFile(path, "rb") as f:
492 return cls.from_file(f)
494 @classmethod
495 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
496 """Get the contents of a SHA file on disk."""
497 try:
498 obj = cls._parse_file(f)
499 obj._sha = None
500 return obj
501 except (IndexError, ValueError) as exc:
502 raise ObjectFormatException("invalid object header") from exc
504 @staticmethod
505 def from_raw_string(
506 type_num: int, string: bytes, sha: Optional[ObjectID] = None
507 ) -> "ShaFile":
508 """Creates an object of the indicated type from the raw string given.
510 Args:
511 type_num: The numeric type of the object.
512 string: The raw uncompressed contents.
513 sha: Optional known sha for the object
514 """
515 cls = object_class(type_num)
516 if cls is None:
517 raise AssertionError(f"unsupported class type num: {type_num}")
518 obj = cls()
519 obj.set_raw_string(string, sha)
520 return obj
522 @staticmethod
523 def from_raw_chunks(
524 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None
525 ) -> "ShaFile":
526 """Creates an object of the indicated type from the raw chunks given.
528 Args:
529 type_num: The numeric type of the object.
530 chunks: An iterable of the raw uncompressed contents.
531 sha: Optional known sha for the object
532 """
533 cls = object_class(type_num)
534 if cls is None:
535 raise AssertionError(f"unsupported class type num: {type_num}")
536 obj = cls()
537 obj.set_raw_chunks(chunks, sha)
538 return obj
540 @classmethod
541 def from_string(cls, string: bytes) -> "ShaFile":
542 """Create a ShaFile from a string."""
543 obj = cls()
544 obj.set_raw_string(string)
545 return obj
547 def _check_has_member(self, member: str, error_msg: str) -> None:
548 """Check that the object has a given member variable.
550 Args:
551 member: the member variable to check for
552 error_msg: the message for an error if the member is missing
553 Raises:
554 ObjectFormatException: with the given error_msg if member is
555 missing or is None
556 """
557 if getattr(self, member, None) is None:
558 raise ObjectFormatException(error_msg)
560 def check(self) -> None:
561 """Check this object for internal consistency.
563 Raises:
564 ObjectFormatException: if the object is malformed in some way
565 ChecksumMismatch: if the object was created with a SHA that does
566 not match its contents
567 """
568 # TODO: if we find that error-checking during object parsing is a
569 # performance bottleneck, those checks should be moved to the class's
570 # check() method during optimization so we can still check the object
571 # when necessary.
572 old_sha = self.id
573 try:
574 self._deserialize(self.as_raw_chunks())
575 self._sha = None
576 new_sha = self.id
577 except Exception as exc:
578 raise ObjectFormatException(exc) from exc
579 if old_sha != new_sha:
580 raise ChecksumMismatch(new_sha, old_sha)
582 def _header(self) -> bytes:
583 return object_header(self.type_num, self.raw_length())
585 def raw_length(self) -> int:
586 """Returns the length of the raw string of this object."""
587 return sum(map(len, self.as_raw_chunks()))
589 def sha(self) -> Union[FixedSha, "HASH"]:
590 """The SHA1 object that is the name of this object."""
591 if self._sha is None or self._needs_serialization:
592 # this is a local because as_raw_chunks() overwrites self._sha
593 new_sha = sha1()
594 new_sha.update(self._header())
595 for chunk in self.as_raw_chunks():
596 new_sha.update(chunk)
597 self._sha = new_sha
598 return self._sha
600 def copy(self) -> "ShaFile":
601 """Create a new copy of this SHA1 object from its raw string."""
602 obj_class = object_class(self.type_num)
603 if obj_class is None:
604 raise AssertionError(f"invalid type num {self.type_num}")
605 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
607 @property
608 def id(self) -> bytes:
609 """The hex SHA of this object."""
610 return self.sha().hexdigest().encode("ascii")
612 def __repr__(self) -> str:
613 return f"<{self.__class__.__name__} {self.id!r}>"
615 def __ne__(self, other: object) -> bool:
616 """Check whether this object does not match the other."""
617 return not isinstance(other, ShaFile) or self.id != other.id
619 def __eq__(self, other: object) -> bool:
620 """Return True if the SHAs of the two objects match."""
621 return isinstance(other, ShaFile) and self.id == other.id
623 def __lt__(self, other: object) -> bool:
624 """Return whether SHA of this object is less than the other."""
625 if not isinstance(other, ShaFile):
626 raise TypeError
627 return self.id < other.id
629 def __le__(self, other: object) -> bool:
630 """Check whether SHA of this object is less than or equal to the other."""
631 if not isinstance(other, ShaFile):
632 raise TypeError
633 return self.id <= other.id
636class Blob(ShaFile):
637 """A Git Blob object."""
639 __slots__ = ()
641 type_name = b"blob"
642 type_num = 3
644 _chunked_text: list[bytes]
646 def __init__(self) -> None:
647 super().__init__()
648 self._chunked_text = []
649 self._needs_serialization = False
651 def _get_data(self) -> bytes:
652 return self.as_raw_string()
654 def _set_data(self, data: bytes) -> None:
655 self.set_raw_string(data)
657 data = property(
658 _get_data, _set_data, doc="The text contained within the blob object."
659 )
661 def _get_chunked(self) -> list[bytes]:
662 return self._chunked_text
664 def _set_chunked(self, chunks: list[bytes]) -> None:
665 self._chunked_text = chunks
667 def _serialize(self) -> list[bytes]:
668 return self._chunked_text
670 def _deserialize(self, chunks: list[bytes]) -> None:
671 self._chunked_text = chunks
673 chunked = property(
674 _get_chunked,
675 _set_chunked,
676 doc="The text in the blob object, as chunks (not necessarily lines)",
677 )
679 @classmethod
680 def from_path(cls, path: Union[str, bytes]) -> "Blob":
681 blob = ShaFile.from_path(path)
682 if not isinstance(blob, cls):
683 raise NotBlobError(path)
684 return blob
686 def check(self) -> None:
687 """Check this object for internal consistency.
689 Raises:
690 ObjectFormatException: if the object is malformed in some way
691 """
692 super().check()
694 def splitlines(self) -> list[bytes]:
695 """Return list of lines in this blob.
697 This preserves the original line endings.
698 """
699 chunks = self.chunked
700 if not chunks:
701 return []
702 if len(chunks) == 1:
703 return chunks[0].splitlines(True)
704 remaining = None
705 ret = []
706 for chunk in chunks:
707 lines = chunk.splitlines(True)
708 if len(lines) > 1:
709 ret.append((remaining or b"") + lines[0])
710 ret.extend(lines[1:-1])
711 remaining = lines[-1]
712 elif len(lines) == 1:
713 if remaining is None:
714 remaining = lines.pop()
715 else:
716 remaining += lines.pop()
717 if remaining is not None:
718 ret.append(remaining)
719 return ret
722def _parse_message(
723 chunks: Iterable[bytes],
724) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]:
725 """Parse a message with a list of fields and a body.
727 Args:
728 chunks: the raw chunks of the tag or commit object.
729 Returns: iterator of tuples of (field, value), one per header line, in the
730 order read from the text, possibly including duplicates. Includes a
731 field named None for the freeform tag/commit text.
732 """
733 f = BytesIO(b"".join(chunks))
734 k = None
735 v = b""
736 eof = False
738 def _strip_last_newline(value: bytes) -> bytes:
739 """Strip the last newline from value."""
740 if value and value.endswith(b"\n"):
741 return value[:-1]
742 return value
744 # Parse the headers
745 #
746 # Headers can contain newlines. The next line is indented with a space.
747 # We store the latest key as 'k', and the accumulated value as 'v'.
748 for line in f:
749 if line.startswith(b" "):
750 # Indented continuation of the previous line
751 v += line[1:]
752 else:
753 if k is not None:
754 # We parsed a new header, return its value
755 yield (k, _strip_last_newline(v))
756 if line == b"\n":
757 # Empty line indicates end of headers
758 break
759 (k, v) = line.split(b" ", 1)
761 else:
762 # We reached end of file before the headers ended. We still need to
763 # return the previous header, then we need to return a None field for
764 # the text.
765 eof = True
766 if k is not None:
767 yield (k, _strip_last_newline(v))
768 yield (None, None)
770 if not eof:
771 # We didn't reach the end of file while parsing headers. We can return
772 # the rest of the file as a message.
773 yield (None, f.read())
775 f.close()
778def _format_message(
779 headers: list[tuple[bytes, bytes]], body: Optional[bytes]
780) -> Iterator[bytes]:
781 for field, value in headers:
782 lines = value.split(b"\n")
783 yield git_line(field, lines[0])
784 for line in lines[1:]:
785 yield b" " + line + b"\n"
786 yield b"\n" # There must be a new line after the headers
787 if body:
788 yield body
791class Tag(ShaFile):
792 """A Git Tag object."""
794 type_name = b"tag"
795 type_num = 4
797 __slots__ = (
798 "_message",
799 "_name",
800 "_object_class",
801 "_object_sha",
802 "_signature",
803 "_tag_time",
804 "_tag_timezone",
805 "_tag_timezone_neg_utc",
806 "_tagger",
807 )
809 _message: Optional[bytes]
810 _name: Optional[bytes]
811 _object_class: Optional[type["ShaFile"]]
812 _object_sha: Optional[bytes]
813 _signature: Optional[bytes]
814 _tag_time: Optional[int]
815 _tag_timezone: Optional[int]
816 _tag_timezone_neg_utc: Optional[bool]
817 _tagger: Optional[bytes]
819 def __init__(self) -> None:
820 super().__init__()
821 self._tagger = None
822 self._tag_time = None
823 self._tag_timezone = None
824 self._tag_timezone_neg_utc = False
825 self._signature: Optional[bytes] = None
827 @classmethod
828 def from_path(cls, filename: Union[str, bytes]) -> "Tag":
829 tag = ShaFile.from_path(filename)
830 if not isinstance(tag, cls):
831 raise NotTagError(filename)
832 return tag
834 def check(self) -> None:
835 """Check this object for internal consistency.
837 Raises:
838 ObjectFormatException: if the object is malformed in some way
839 """
840 super().check()
841 assert self._chunked_text is not None
842 self._check_has_member("_object_sha", "missing object sha")
843 self._check_has_member("_object_class", "missing object type")
844 self._check_has_member("_name", "missing tag name")
846 if not self._name:
847 raise ObjectFormatException("empty tag name")
849 if self._object_sha is None:
850 raise ObjectFormatException("missing object sha")
851 check_hexsha(self._object_sha, "invalid object sha")
853 if self._tagger is not None:
854 check_identity(self._tagger, "invalid tagger")
856 self._check_has_member("_tag_time", "missing tag time")
857 if self._tag_time is None:
858 raise ObjectFormatException("missing tag time")
859 check_time(self._tag_time)
861 last = None
862 for field, _ in _parse_message(self._chunked_text):
863 if field == _OBJECT_HEADER and last is not None:
864 raise ObjectFormatException("unexpected object")
865 elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
866 raise ObjectFormatException("unexpected type")
867 elif field == _TAG_HEADER and last != _TYPE_HEADER:
868 raise ObjectFormatException("unexpected tag name")
869 elif field == _TAGGER_HEADER and last != _TAG_HEADER:
870 raise ObjectFormatException("unexpected tagger")
871 last = field
873 def _serialize(self) -> list[bytes]:
874 headers = []
875 if self._object_sha is None:
876 raise ObjectFormatException("missing object sha")
877 headers.append((_OBJECT_HEADER, self._object_sha))
878 if self._object_class is None:
879 raise ObjectFormatException("missing object class")
880 headers.append((_TYPE_HEADER, self._object_class.type_name))
881 if self._name is None:
882 raise ObjectFormatException("missing tag name")
883 headers.append((_TAG_HEADER, self._name))
884 if self._tagger:
885 if self._tag_time is None:
886 headers.append((_TAGGER_HEADER, self._tagger))
887 else:
888 if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
889 raise ObjectFormatException("missing timezone info")
890 headers.append(
891 (
892 _TAGGER_HEADER,
893 format_time_entry(
894 self._tagger,
895 self._tag_time,
896 (self._tag_timezone, self._tag_timezone_neg_utc),
897 ),
898 )
899 )
901 if self.message is None and self._signature is None:
902 body = None
903 else:
904 body = (self.message or b"") + (self._signature or b"")
905 return list(_format_message(headers, body))
907 def _deserialize(self, chunks: list[bytes]) -> None:
908 """Grab the metadata attached to the tag."""
909 self._tagger = None
910 self._tag_time = None
911 self._tag_timezone = None
912 self._tag_timezone_neg_utc = False
913 for field, value in _parse_message(chunks):
914 if field == _OBJECT_HEADER:
915 self._object_sha = value
916 elif field == _TYPE_HEADER:
917 assert isinstance(value, bytes)
918 obj_class = object_class(value)
919 if not obj_class:
920 raise ObjectFormatException(f"Not a known type: {value!r}")
921 self._object_class = obj_class
922 elif field == _TAG_HEADER:
923 self._name = value
924 elif field == _TAGGER_HEADER:
925 if value is None:
926 raise ObjectFormatException("missing tagger value")
927 (
928 self._tagger,
929 self._tag_time,
930 (self._tag_timezone, self._tag_timezone_neg_utc),
931 ) = parse_time_entry(value)
932 elif field is None:
933 if value is None:
934 self._message = None
935 self._signature = None
936 else:
937 try:
938 sig_idx = value.index(BEGIN_PGP_SIGNATURE)
939 except ValueError:
940 self._message = value
941 self._signature = None
942 else:
943 self._message = value[:sig_idx]
944 self._signature = value[sig_idx:]
945 else:
946 raise ObjectFormatException(
947 f"Unknown field {field.decode('ascii', 'replace')}"
948 )
950 def _get_object(self) -> tuple[type[ShaFile], bytes]:
951 """Get the object pointed to by this tag.
953 Returns: tuple of (object class, sha).
954 """
955 if self._object_class is None or self._object_sha is None:
956 raise ValueError("Tag object is not properly initialized")
957 return (self._object_class, self._object_sha)
959 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
960 (self._object_class, self._object_sha) = value
961 self._needs_serialization = True
963 object = property(_get_object, _set_object)
965 name = serializable_property("name", "The name of this tag")
966 tagger = serializable_property(
967 "tagger", "Returns the name of the person who created this tag"
968 )
969 tag_time = serializable_property(
970 "tag_time",
971 "The creation timestamp of the tag. As the number of seconds since the epoch",
972 )
973 tag_timezone = serializable_property(
974 "tag_timezone", "The timezone that tag_time is in."
975 )
976 message = serializable_property("message", "the message attached to this tag")
978 signature = serializable_property("signature", "Optional detached GPG signature")
980 def sign(self, keyid: Optional[str] = None) -> None:
981 import gpg
983 with gpg.Context(armor=True) as c:
984 if keyid is not None:
985 key = c.get_key(keyid)
986 with gpg.Context(armor=True, signers=[key]) as ctx:
987 self.signature, unused_result = ctx.sign(
988 self.as_raw_string(),
989 mode=gpg.constants.sig.mode.DETACH,
990 )
991 else:
992 self.signature, unused_result = c.sign(
993 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
994 )
996 def raw_without_sig(self) -> bytes:
997 """Return raw string serialization without the GPG/SSH signature.
999 self.signature is a signature for the returned raw byte string serialization.
1000 """
1001 ret = self.as_raw_string()
1002 if self._signature:
1003 ret = ret[: -len(self._signature)]
1004 return ret
1006 def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
1007 """Verify GPG signature for this tag (if it is signed).
1009 Args:
1010 keyids: Optional iterable of trusted keyids for this tag.
1011 If this tag is not signed by any key in keyids verification will
1012 fail. If not specified, this function only verifies that the tag
1013 has a valid signature.
1015 Raises:
1016 gpg.errors.BadSignatures: if GPG signature verification fails
1017 gpg.errors.MissingSignatures: if tag was not signed by a key
1018 specified in keyids
1019 """
1020 if self._signature is None:
1021 return
1023 import gpg
1025 with gpg.Context() as ctx:
1026 data, result = ctx.verify(
1027 self.raw_without_sig(),
1028 signature=self._signature,
1029 )
1030 if keyids:
1031 keys = [ctx.get_key(key) for key in keyids]
1032 for key in keys:
1033 for subkey in keys:
1034 for sig in result.signatures:
1035 if subkey.can_sign and subkey.fpr == sig.fpr:
1036 return
1037 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1040class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])):
1041 """Named tuple encapsulating a single tree entry."""
1043 def in_path(self, path: bytes) -> "TreeEntry":
1044 """Return a copy of this entry with the given path prepended."""
1045 if not isinstance(self.path, bytes):
1046 raise TypeError(f"Expected bytes for path, got {path!r}")
1047 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
1050def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]:
1051 """Parse a tree text.
1053 Args:
1054 text: Serialized text to parse
1055 Returns: iterator of tuples of (name, mode, sha)
1057 Raises:
1058 ObjectFormatException: if the object was malformed in some way
1059 """
1060 count = 0
1061 length = len(text)
1062 while count < length:
1063 mode_end = text.index(b" ", count)
1064 mode_text = text[count:mode_end]
1065 if strict and mode_text.startswith(b"0"):
1066 raise ObjectFormatException(f"Invalid mode {mode_text!r}")
1067 try:
1068 mode = int(mode_text, 8)
1069 except ValueError as exc:
1070 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
1071 name_end = text.index(b"\0", mode_end)
1072 name = text[mode_end + 1 : name_end]
1073 count = name_end + 21
1074 sha = text[name_end + 1 : count]
1075 if len(sha) != 20:
1076 raise ObjectFormatException("Sha has invalid length")
1077 hexsha = sha_to_hex(sha)
1078 yield (name, mode, hexsha)
1081def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]:
1082 """Serialize the items in a tree to a text.
1084 Args:
1085 items: Sorted iterable over (name, mode, sha) tuples
1086 Returns: Serialized tree text as chunks
1087 """
1088 for name, mode, hexsha in items:
1089 yield (
1090 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha)
1091 )
1094def sorted_tree_items(
1095 entries: dict[bytes, tuple[int, bytes]], name_order: bool
1096) -> Iterator[TreeEntry]:
1097 """Iterate over a tree entries dictionary.
1099 Args:
1100 name_order: If True, iterate entries in order of their name. If
1101 False, iterate entries in tree order, that is, treat subtree entries as
1102 having '/' appended.
1103 entries: Dictionary mapping names to (mode, sha) tuples
1104 Returns: Iterator over (name, mode, hexsha)
1105 """
1106 if name_order:
1107 key_func = key_entry_name_order
1108 else:
1109 key_func = key_entry
1110 for name, entry in sorted(entries.items(), key=key_func):
1111 mode, hexsha = entry
1112 # Stricter type checks than normal to mirror checks in the Rust version.
1113 mode = int(mode)
1114 if not isinstance(hexsha, bytes):
1115 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}")
1116 yield TreeEntry(name, mode, hexsha)
1119def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1120 """Sort key for tree entry.
1122 Args:
1123 entry: (name, value) tuple
1124 """
1125 (name, (mode, _sha)) = entry
1126 if stat.S_ISDIR(mode):
1127 name += b"/"
1128 return name
1131def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1132 """Sort key for tree entry in name order."""
1133 return entry[0]
1136def pretty_format_tree_entry(
1137 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8"
1138) -> str:
1139 """Pretty format tree entry.
1141 Args:
1142 name: Name of the directory entry
1143 mode: Mode of entry
1144 hexsha: Hexsha of the referenced object
1145 Returns: string describing the tree entry
1146 """
1147 if mode & stat.S_IFDIR:
1148 kind = "tree"
1149 else:
1150 kind = "blob"
1151 return "{:04o} {} {}\t{}\n".format(
1152 mode,
1153 kind,
1154 hexsha.decode("ascii"),
1155 name.decode(encoding, "replace"),
1156 )
1159class SubmoduleEncountered(Exception):
1160 """A submodule was encountered while resolving a path."""
1162 def __init__(self, path: bytes, sha: ObjectID) -> None:
1163 self.path = path
1164 self.sha = sha
1167class Tree(ShaFile):
1168 """A Git tree object."""
1170 type_name = b"tree"
1171 type_num = 2
1173 __slots__ = "_entries"
1175 def __init__(self) -> None:
1176 super().__init__()
1177 self._entries: dict[bytes, tuple[int, bytes]] = {}
1179 @classmethod
1180 def from_path(cls, filename: Union[str, bytes]) -> "Tree":
1181 tree = ShaFile.from_path(filename)
1182 if not isinstance(tree, cls):
1183 raise NotTreeError(filename)
1184 return tree
1186 def __contains__(self, name: bytes) -> bool:
1187 return name in self._entries
1189 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
1190 return self._entries[name]
1192 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
1193 """Set a tree entry by name.
1195 Args:
1196 name: The name of the entry, as a string.
1197 value: A tuple of (mode, hexsha), where mode is the mode of the
1198 entry as an integral type and hexsha is the hex SHA of the entry as
1199 a string.
1200 """
1201 mode, hexsha = value
1202 self._entries[name] = (mode, hexsha)
1203 self._needs_serialization = True
1205 def __delitem__(self, name: bytes) -> None:
1206 del self._entries[name]
1207 self._needs_serialization = True
1209 def __len__(self) -> int:
1210 return len(self._entries)
1212 def __iter__(self) -> Iterator[bytes]:
1213 return iter(self._entries)
1215 def add(self, name: bytes, mode: int, hexsha: bytes) -> None:
1216 """Add an entry to the tree.
1218 Args:
1219 mode: The mode of the entry as an integral type. Not all
1220 possible modes are supported by git; see check() for details.
1221 name: The name of the entry, as a string.
1222 hexsha: The hex SHA of the entry as a string.
1223 """
1224 self._entries[name] = mode, hexsha
1225 self._needs_serialization = True
1227 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
1228 """Iterate over entries.
1230 Args:
1231 name_order: If True, iterate in name order instead of tree
1232 order.
1233 Returns: Iterator over (name, mode, sha) tuples
1234 """
1235 return sorted_tree_items(self._entries, name_order)
1237 def items(self) -> list[TreeEntry]:
1238 """Return the sorted entries in this tree.
1240 Returns: List with (name, mode, sha) tuples
1241 """
1242 return list(self.iteritems())
1244 def _deserialize(self, chunks: list[bytes]) -> None:
1245 """Grab the entries in the tree."""
1246 try:
1247 parsed_entries = parse_tree(b"".join(chunks))
1248 except ValueError as exc:
1249 raise ObjectFormatException(exc) from exc
1250 # TODO: list comprehension is for efficiency in the common (small)
1251 # case; if memory efficiency in the large case is a concern, use a
1252 # genexp.
1253 self._entries = {n: (m, s) for n, m, s in parsed_entries}
1255 def check(self) -> None:
1256 """Check this object for internal consistency.
1258 Raises:
1259 ObjectFormatException: if the object is malformed in some way
1260 """
1261 super().check()
1262 assert self._chunked_text is not None
1263 last = None
1264 allowed_modes = (
1265 stat.S_IFREG | 0o755,
1266 stat.S_IFREG | 0o644,
1267 stat.S_IFLNK,
1268 stat.S_IFDIR,
1269 S_IFGITLINK,
1270 # TODO: optionally exclude as in git fsck --strict
1271 stat.S_IFREG | 0o664,
1272 )
1273 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True):
1274 check_hexsha(sha, f"invalid sha {sha!r}")
1275 if b"/" in name or name in (b"", b".", b"..", b".git"):
1276 raise ObjectFormatException(
1277 "invalid name {}".format(name.decode("utf-8", "replace"))
1278 )
1280 if mode not in allowed_modes:
1281 raise ObjectFormatException(f"invalid mode {mode:06o}")
1283 entry = (name, (mode, sha))
1284 if last:
1285 if key_entry(last) > key_entry(entry):
1286 raise ObjectFormatException("entries not sorted")
1287 if name == last[0]:
1288 raise ObjectFormatException(f"duplicate entry {name!r}")
1289 last = entry
1291 def _serialize(self) -> list[bytes]:
1292 return list(serialize_tree(self.iteritems()))
1294 def as_pretty_string(self) -> str:
1295 text: list[str] = []
1296 for name, mode, hexsha in self.iteritems():
1297 text.append(pretty_format_tree_entry(name, mode, hexsha))
1298 return "".join(text)
1300 def lookup_path(
1301 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
1302 ) -> tuple[int, ObjectID]:
1303 """Look up an object in a Git tree.
1305 Args:
1306 lookup_obj: Callback for retrieving object by SHA1
1307 path: Path to lookup
1308 Returns: A tuple of (mode, SHA) of the resulting path.
1309 """
1310 parts = path.split(b"/")
1311 sha = self.id
1312 mode: Optional[int] = None
1313 for i, p in enumerate(parts):
1314 if not p:
1315 continue
1316 if mode is not None and S_ISGITLINK(mode):
1317 raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
1318 obj = lookup_obj(sha)
1319 if not isinstance(obj, Tree):
1320 raise NotTreeError(sha)
1321 mode, sha = obj[p]
1322 if mode is None:
1323 raise ValueError("No valid path found")
1324 return mode, sha
1327def parse_timezone(text: bytes) -> tuple[int, bool]:
1328 """Parse a timezone text fragment (e.g. '+0100').
1330 Args:
1331 text: Text to parse.
1332 Returns: Tuple with timezone as seconds difference to UTC
1333 and a boolean indicating whether this was a UTC timezone
1334 prefixed with a negative sign (-0000).
1335 """
1336 # cgit parses the first character as the sign, and the rest
1337 # as an integer (using strtol), which could also be negative.
1338 # We do the same for compatibility. See #697828.
1339 if text[0] not in b"+-":
1340 raise ValueError("Timezone must start with + or - ({text})".format(**vars()))
1341 sign = text[:1]
1342 offset = int(text[1:])
1343 if sign == b"-":
1344 offset = -offset
1345 unnecessary_negative_timezone = offset >= 0 and sign == b"-"
1346 signum = ((offset < 0) and -1) or 1
1347 offset = abs(offset)
1348 hours = int(offset / 100)
1349 minutes = offset % 100
1350 return (
1351 signum * (hours * 3600 + minutes * 60),
1352 unnecessary_negative_timezone,
1353 )
1356def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
1357 """Format a timezone for Git serialization.
1359 Args:
1360 offset: Timezone offset as seconds difference to UTC
1361 unnecessary_negative_timezone: Whether to use a minus sign for
1362 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700).
1363 """
1364 if offset % 60 != 0:
1365 raise ValueError("Unable to handle non-minute offset.")
1366 if offset < 0 or unnecessary_negative_timezone:
1367 sign = "-"
1368 offset = -offset
1369 else:
1370 sign = "+"
1371 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
1374def parse_time_entry(
1375 value: bytes,
1376) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]:
1377 """Parse event.
1379 Args:
1380 value: Bytes representing a git commit/tag line
1381 Raises:
1382 ObjectFormatException in case of parsing error (malformed
1383 field date)
1384 Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
1385 """
1386 try:
1387 sep = value.rindex(b"> ")
1388 except ValueError:
1389 return (value, None, (None, False))
1390 try:
1391 person = value[0 : sep + 1]
1392 rest = value[sep + 2 :]
1393 timetext, timezonetext = rest.rsplit(b" ", 1)
1394 time = int(timetext)
1395 timezone, timezone_neg_utc = parse_timezone(timezonetext)
1396 except ValueError as exc:
1397 raise ObjectFormatException(exc) from exc
1398 return person, time, (timezone, timezone_neg_utc)
1401def format_time_entry(
1402 person: bytes, time: int, timezone_info: tuple[int, bool]
1403) -> bytes:
1404 """Format an event."""
1405 (timezone, timezone_neg_utc) = timezone_info
1406 return b" ".join(
1407 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
1408 )
1411@replace_me(since="0.21.0", remove_in="0.24.0")
1412def parse_commit(
1413 chunks: Iterable[bytes],
1414) -> tuple[
1415 Optional[bytes],
1416 list[bytes],
1417 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
1418 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
1419 Optional[bytes],
1420 list[Tag],
1421 Optional[bytes],
1422 Optional[bytes],
1423 list[tuple[bytes, bytes]],
1424]:
1425 """Parse a commit object from chunks.
1427 Args:
1428 chunks: Chunks to parse
1429 Returns: Tuple of (tree, parents, author_info, commit_info,
1430 encoding, mergetag, gpgsig, message, extra)
1431 """
1432 parents = []
1433 extra = []
1434 tree = None
1435 author_info: tuple[
1436 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1437 ] = (None, None, (None, None))
1438 commit_info: tuple[
1439 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1440 ] = (None, None, (None, None))
1441 encoding = None
1442 mergetag = []
1443 message = None
1444 gpgsig = None
1446 for field, value in _parse_message(chunks):
1447 # TODO(jelmer): Enforce ordering
1448 if field == _TREE_HEADER:
1449 tree = value
1450 elif field == _PARENT_HEADER:
1451 if value is None:
1452 raise ObjectFormatException("missing parent value")
1453 parents.append(value)
1454 elif field == _AUTHOR_HEADER:
1455 if value is None:
1456 raise ObjectFormatException("missing author value")
1457 author_info = parse_time_entry(value)
1458 elif field == _COMMITTER_HEADER:
1459 if value is None:
1460 raise ObjectFormatException("missing committer value")
1461 commit_info = parse_time_entry(value)
1462 elif field == _ENCODING_HEADER:
1463 encoding = value
1464 elif field == _MERGETAG_HEADER:
1465 if value is None:
1466 raise ObjectFormatException("missing mergetag value")
1467 tag = Tag.from_string(value + b"\n")
1468 assert isinstance(tag, Tag)
1469 mergetag.append(tag)
1470 elif field == _GPGSIG_HEADER:
1471 gpgsig = value
1472 elif field is None:
1473 message = value
1474 else:
1475 if value is None:
1476 raise ObjectFormatException(f"missing value for field {field!r}")
1477 extra.append((field, value))
1478 return (
1479 tree,
1480 parents,
1481 author_info,
1482 commit_info,
1483 encoding,
1484 mergetag,
1485 gpgsig,
1486 message,
1487 extra,
1488 )
1491class Commit(ShaFile):
1492 """A git commit object."""
1494 type_name = b"commit"
1495 type_num = 1
1497 __slots__ = (
1498 "_author",
1499 "_author_time",
1500 "_author_timezone",
1501 "_author_timezone_neg_utc",
1502 "_commit_time",
1503 "_commit_timezone",
1504 "_commit_timezone_neg_utc",
1505 "_committer",
1506 "_encoding",
1507 "_extra",
1508 "_gpgsig",
1509 "_mergetag",
1510 "_message",
1511 "_parents",
1512 "_tree",
1513 )
1515 def __init__(self) -> None:
1516 super().__init__()
1517 self._parents: list[bytes] = []
1518 self._encoding: Optional[bytes] = None
1519 self._mergetag: list[Tag] = []
1520 self._gpgsig: Optional[bytes] = None
1521 self._extra: list[tuple[bytes, Optional[bytes]]] = []
1522 self._author_timezone_neg_utc: Optional[bool] = False
1523 self._commit_timezone_neg_utc: Optional[bool] = False
1525 @classmethod
1526 def from_path(cls, path: Union[str, bytes]) -> "Commit":
1527 commit = ShaFile.from_path(path)
1528 if not isinstance(commit, cls):
1529 raise NotCommitError(path)
1530 return commit
1532 def _deserialize(self, chunks: list[bytes]) -> None:
1533 self._parents = []
1534 self._extra = []
1535 self._tree = None
1536 author_info: tuple[
1537 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1538 ] = (None, None, (None, None))
1539 commit_info: tuple[
1540 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1541 ] = (None, None, (None, None))
1542 self._encoding = None
1543 self._mergetag = []
1544 self._message = None
1545 self._gpgsig = None
1547 for field, value in _parse_message(chunks):
1548 # TODO(jelmer): Enforce ordering
1549 if field == _TREE_HEADER:
1550 self._tree = value
1551 elif field == _PARENT_HEADER:
1552 assert value is not None
1553 self._parents.append(value)
1554 elif field == _AUTHOR_HEADER:
1555 if value is None:
1556 raise ObjectFormatException("missing author value")
1557 author_info = parse_time_entry(value)
1558 elif field == _COMMITTER_HEADER:
1559 if value is None:
1560 raise ObjectFormatException("missing committer value")
1561 commit_info = parse_time_entry(value)
1562 elif field == _ENCODING_HEADER:
1563 self._encoding = value
1564 elif field == _MERGETAG_HEADER:
1565 assert value is not None
1566 tag = Tag.from_string(value + b"\n")
1567 assert isinstance(tag, Tag)
1568 self._mergetag.append(tag)
1569 elif field == _GPGSIG_HEADER:
1570 self._gpgsig = value
1571 elif field is None:
1572 self._message = value
1573 else:
1574 self._extra.append((field, value))
1576 (
1577 self._author,
1578 self._author_time,
1579 (self._author_timezone, self._author_timezone_neg_utc),
1580 ) = author_info
1581 (
1582 self._committer,
1583 self._commit_time,
1584 (self._commit_timezone, self._commit_timezone_neg_utc),
1585 ) = commit_info
1587 def check(self) -> None:
1588 """Check this object for internal consistency.
1590 Raises:
1591 ObjectFormatException: if the object is malformed in some way
1592 """
1593 super().check()
1594 assert self._chunked_text is not None
1595 self._check_has_member("_tree", "missing tree")
1596 self._check_has_member("_author", "missing author")
1597 self._check_has_member("_committer", "missing committer")
1598 self._check_has_member("_author_time", "missing author time")
1599 self._check_has_member("_commit_time", "missing commit time")
1601 for parent in self._parents:
1602 check_hexsha(parent, "invalid parent sha")
1603 assert self._tree is not None # checked by _check_has_member above
1604 check_hexsha(self._tree, "invalid tree sha")
1606 assert self._author is not None # checked by _check_has_member above
1607 assert self._committer is not None # checked by _check_has_member above
1608 check_identity(self._author, "invalid author")
1609 check_identity(self._committer, "invalid committer")
1611 assert self._author_time is not None # checked by _check_has_member above
1612 assert self._commit_time is not None # checked by _check_has_member above
1613 check_time(self._author_time)
1614 check_time(self._commit_time)
1616 last = None
1617 for field, _ in _parse_message(self._chunked_text):
1618 if field == _TREE_HEADER and last is not None:
1619 raise ObjectFormatException("unexpected tree")
1620 elif field == _PARENT_HEADER and last not in (
1621 _PARENT_HEADER,
1622 _TREE_HEADER,
1623 ):
1624 raise ObjectFormatException("unexpected parent")
1625 elif field == _AUTHOR_HEADER and last not in (
1626 _TREE_HEADER,
1627 _PARENT_HEADER,
1628 ):
1629 raise ObjectFormatException("unexpected author")
1630 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER:
1631 raise ObjectFormatException("unexpected committer")
1632 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER:
1633 raise ObjectFormatException("unexpected encoding")
1634 last = field
1636 # TODO: optionally check for duplicate parents
1638 def sign(self, keyid: Optional[str] = None) -> None:
1639 import gpg
1641 with gpg.Context(armor=True) as c:
1642 if keyid is not None:
1643 key = c.get_key(keyid)
1644 with gpg.Context(armor=True, signers=[key]) as ctx:
1645 self.gpgsig, unused_result = ctx.sign(
1646 self.as_raw_string(),
1647 mode=gpg.constants.sig.mode.DETACH,
1648 )
1649 else:
1650 self.gpgsig, unused_result = c.sign(
1651 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1652 )
1654 def raw_without_sig(self) -> bytes:
1655 """Return raw string serialization without the GPG/SSH signature.
1657 self.gpgsig is a signature for the returned raw byte string serialization.
1658 """
1659 tmp = self.copy()
1660 assert isinstance(tmp, Commit)
1661 tmp._gpgsig = None
1662 tmp.gpgsig = None
1663 return tmp.as_raw_string()
1665 def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
1666 """Verify GPG signature for this commit (if it is signed).
1668 Args:
1669 keyids: Optional iterable of trusted keyids for this commit.
1670 If this commit is not signed by any key in keyids verification will
1671 fail. If not specified, this function only verifies that the commit
1672 has a valid signature.
1674 Raises:
1675 gpg.errors.BadSignatures: if GPG signature verification fails
1676 gpg.errors.MissingSignatures: if commit was not signed by a key
1677 specified in keyids
1678 """
1679 if self._gpgsig is None:
1680 return
1682 import gpg
1684 with gpg.Context() as ctx:
1685 data, result = ctx.verify(
1686 self.raw_without_sig(),
1687 signature=self._gpgsig,
1688 )
1689 if keyids:
1690 keys = [ctx.get_key(key) for key in keyids]
1691 for key in keys:
1692 for subkey in keys:
1693 for sig in result.signatures:
1694 if subkey.can_sign and subkey.fpr == sig.fpr:
1695 return
1696 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1698 def _serialize(self) -> list[bytes]:
1699 headers = []
1700 assert self._tree is not None
1701 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
1702 headers.append((_TREE_HEADER, tree_bytes))
1703 for p in self._parents:
1704 headers.append((_PARENT_HEADER, p))
1705 assert self._author is not None
1706 assert self._author_time is not None
1707 assert self._author_timezone is not None
1708 assert self._author_timezone_neg_utc is not None
1709 headers.append(
1710 (
1711 _AUTHOR_HEADER,
1712 format_time_entry(
1713 self._author,
1714 self._author_time,
1715 (self._author_timezone, self._author_timezone_neg_utc),
1716 ),
1717 )
1718 )
1719 assert self._committer is not None
1720 assert self._commit_time is not None
1721 assert self._commit_timezone is not None
1722 assert self._commit_timezone_neg_utc is not None
1723 headers.append(
1724 (
1725 _COMMITTER_HEADER,
1726 format_time_entry(
1727 self._committer,
1728 self._commit_time,
1729 (self._commit_timezone, self._commit_timezone_neg_utc),
1730 ),
1731 )
1732 )
1733 if self.encoding:
1734 headers.append((_ENCODING_HEADER, self.encoding))
1735 for mergetag in self.mergetag:
1736 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
1737 headers.extend(
1738 (field, value) for field, value in self._extra if value is not None
1739 )
1740 if self.gpgsig:
1741 headers.append((_GPGSIG_HEADER, self.gpgsig))
1742 return list(_format_message(headers, self._message))
1744 tree = serializable_property("tree", "Tree that is the state of this commit")
1746 def _get_parents(self) -> list[bytes]:
1747 """Return a list of parents of this commit."""
1748 return self._parents
1750 def _set_parents(self, value: list[bytes]) -> None:
1751 """Set a list of parents of this commit."""
1752 self._needs_serialization = True
1753 self._parents = value
1755 parents = property(
1756 _get_parents,
1757 _set_parents,
1758 doc="Parents of this commit, by their SHA1.",
1759 )
1761 @replace_me(since="0.21.0", remove_in="0.24.0")
1762 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]:
1763 """Return extra settings of this commit."""
1764 return self._extra
1766 extra = property(
1767 _get_extra,
1768 doc="Extra header fields not understood (presumably added in a "
1769 "newer version of git). Kept verbatim so the object can "
1770 "be correctly reserialized. For private commit metadata, use "
1771 "pseudo-headers in Commit.message, rather than this field.",
1772 )
1774 author = serializable_property("author", "The name of the author of the commit")
1776 committer = serializable_property(
1777 "committer", "The name of the committer of the commit"
1778 )
1780 message = serializable_property("message", "The commit message")
1782 commit_time = serializable_property(
1783 "commit_time",
1784 "The timestamp of the commit. As the number of seconds since the epoch.",
1785 )
1787 commit_timezone = serializable_property(
1788 "commit_timezone", "The zone the commit time is in"
1789 )
1791 author_time = serializable_property(
1792 "author_time",
1793 "The timestamp the commit was written. As the number of "
1794 "seconds since the epoch.",
1795 )
1797 author_timezone = serializable_property(
1798 "author_timezone", "Returns the zone the author time is in."
1799 )
1801 encoding = serializable_property("encoding", "Encoding of the commit message.")
1803 mergetag = serializable_property("mergetag", "Associated signed tag.")
1805 gpgsig = serializable_property("gpgsig", "GPG Signature.")
1808OBJECT_CLASSES = (
1809 Commit,
1810 Tree,
1811 Blob,
1812 Tag,
1813)
1815_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {}
1817for cls in OBJECT_CLASSES:
1818 _TYPE_MAP[cls.type_name] = cls
1819 _TYPE_MAP[cls.type_num] = cls
1822# Hold on to the pure-python implementations for testing
1823_parse_tree_py = parse_tree
1824_sorted_tree_items_py = sorted_tree_items
1825try:
1826 # Try to import Rust versions
1827 from dulwich._objects import (
1828 parse_tree as _parse_tree_rs,
1829 )
1830 from dulwich._objects import (
1831 sorted_tree_items as _sorted_tree_items_rs,
1832 )
1833except ImportError:
1834 pass
1835else:
1836 parse_tree = _parse_tree_rs
1837 sorted_tree_items = _sorted_tree_items_rs