Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 46%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# objects.py -- Access to base git objects
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Access to base git objects."""
25import binascii
26import os
27import posixpath
28import stat
29import sys
30import zlib
31from collections import namedtuple
32from collections.abc import Callable, Iterable, Iterator
33from hashlib import sha1
34from io import BufferedIOBase, BytesIO
35from typing import (
36 IO,
37 TYPE_CHECKING,
38 Optional,
39 Union,
40)
42if sys.version_info >= (3, 11):
43 from typing import Self
44else:
45 from typing_extensions import Self
47try:
48 from typing import TypeGuard # type: ignore
49except ImportError:
50 from typing_extensions import TypeGuard
52from . import replace_me
53from .errors import (
54 ChecksumMismatch,
55 FileFormatException,
56 NotBlobError,
57 NotCommitError,
58 NotTagError,
59 NotTreeError,
60 ObjectFormatException,
61)
62from .file import GitFile
64if TYPE_CHECKING:
65 from _hashlib import HASH
67 from .file import _GitFile
69ZERO_SHA = b"0" * 40
71# Header fields for commits
72_TREE_HEADER = b"tree"
73_PARENT_HEADER = b"parent"
74_AUTHOR_HEADER = b"author"
75_COMMITTER_HEADER = b"committer"
76_ENCODING_HEADER = b"encoding"
77_MERGETAG_HEADER = b"mergetag"
78_GPGSIG_HEADER = b"gpgsig"
80# Header fields for objects
81_OBJECT_HEADER = b"object"
82_TYPE_HEADER = b"type"
83_TAG_HEADER = b"tag"
84_TAGGER_HEADER = b"tagger"
87S_IFGITLINK = 0o160000
90MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
92BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
95ObjectID = bytes
98class EmptyFileException(FileFormatException):
99 """An unexpectedly empty file was encountered."""
102def S_ISGITLINK(m: int) -> bool:
103 """Check if a mode indicates a submodule.
105 Args:
106 m: Mode to check
107 Returns: a ``boolean``
108 """
109 return stat.S_IFMT(m) == S_IFGITLINK
112def _decompress(string: bytes) -> bytes:
113 dcomp = zlib.decompressobj()
114 dcomped = dcomp.decompress(string)
115 dcomped += dcomp.flush()
116 return dcomped
119def sha_to_hex(sha: ObjectID) -> bytes:
120 """Takes a string and returns the hex of the sha within."""
121 hexsha = binascii.hexlify(sha)
122 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}"
123 return hexsha
126def hex_to_sha(hex: Union[bytes, str]) -> bytes:
127 """Takes a hex sha and returns a binary sha."""
128 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}"
129 try:
130 return binascii.unhexlify(hex)
131 except TypeError as exc:
132 if not isinstance(hex, bytes):
133 raise
134 raise ValueError(exc.args[0]) from exc
137def valid_hexsha(hex: Union[bytes, str]) -> bool:
138 if len(hex) != 40:
139 return False
140 try:
141 binascii.unhexlify(hex)
142 except (TypeError, binascii.Error):
143 return False
144 else:
145 return True
148def hex_to_filename(
149 path: Union[str, bytes], hex: Union[str, bytes]
150) -> Union[str, bytes]:
151 """Takes a hex sha and returns its filename relative to the given path."""
152 # os.path.join accepts bytes or unicode, but all args must be of the same
153 # type. Make sure that hex which is expected to be bytes, is the same type
154 # as path.
155 if type(path) is not type(hex) and isinstance(path, str):
156 hex = hex.decode("ascii") # type: ignore
157 dir_name = hex[:2]
158 file_name = hex[2:]
159 # Check from object dir
160 return os.path.join(path, dir_name, file_name) # type: ignore
163def filename_to_hex(filename: Union[str, bytes]) -> str:
164 """Takes an object filename and returns its corresponding hex sha."""
165 # grab the last (up to) two path components
166 names = filename.rsplit(os.path.sep, 2)[-2:] # type: ignore
167 errmsg = f"Invalid object filename: {filename!r}"
168 assert len(names) == 2, errmsg
169 base, rest = names
170 assert len(base) == 2 and len(rest) == 38, errmsg
171 hex_bytes = (base + rest).encode("ascii") # type: ignore
172 hex_to_sha(hex_bytes)
173 return hex_bytes.decode("ascii")
176def object_header(num_type: int, length: int) -> bytes:
177 """Return an object header for the given numeric type and text length."""
178 cls = object_class(num_type)
179 if cls is None:
180 raise AssertionError(f"unsupported class type num: {num_type}")
181 return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
184def serializable_property(name: str, docstring: Optional[str] = None) -> property:
185 """A property that helps tracking whether serialization is necessary."""
187 def set(obj: "ShaFile", value: object) -> None:
188 setattr(obj, "_" + name, value)
189 obj._needs_serialization = True
191 def get(obj: "ShaFile") -> object:
192 return getattr(obj, "_" + name)
194 return property(get, set, doc=docstring)
197def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]:
198 """Get the object class corresponding to the given type.
200 Args:
201 type: Either a type name string or a numeric type.
202 Returns: The ShaFile subclass corresponding to the given type, or None if
203 type is not a valid type name/number.
204 """
205 return _TYPE_MAP.get(type, None)
208def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None:
209 """Check if a string is a valid hex sha string.
211 Args:
212 hex: Hex string to check
213 error_msg: Error message to use in exception
214 Raises:
215 ObjectFormatException: Raised when the string is not valid
216 """
217 if not valid_hexsha(hex):
218 raise ObjectFormatException(f"{error_msg} {hex!r}")
221def check_identity(identity: Optional[bytes], error_msg: str) -> None:
222 """Check if the specified identity is valid.
224 This will raise an exception if the identity is not valid.
226 Args:
227 identity: Identity string
228 error_msg: Error message to use in exception
229 """
230 if identity is None:
231 raise ObjectFormatException(error_msg)
232 email_start = identity.find(b"<")
233 email_end = identity.find(b">")
234 if not all(
235 [
236 email_start >= 1,
237 identity[email_start - 1] == b" "[0],
238 identity.find(b"<", email_start + 1) == -1,
239 email_end == len(identity) - 1,
240 b"\0" not in identity,
241 b"\n" not in identity,
242 ]
243 ):
244 raise ObjectFormatException(error_msg)
247def check_time(time_seconds: int) -> None:
248 """Check if the specified time is not prone to overflow error.
250 This will raise an exception if the time is not valid.
252 Args:
253 time_seconds: time in seconds
255 """
256 # Prevent overflow error
257 if time_seconds > MAX_TIME:
258 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
261def git_line(*items: bytes) -> bytes:
262 """Formats items into a space separated line."""
263 return b" ".join(items) + b"\n"
266class FixedSha:
267 """SHA object that behaves like hashlib's but is given a fixed value."""
269 __slots__ = ("_hexsha", "_sha")
271 def __init__(self, hexsha: Union[str, bytes]) -> None:
272 if isinstance(hexsha, str):
273 hexsha = hexsha.encode("ascii") # type: ignore
274 if not isinstance(hexsha, bytes):
275 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
276 self._hexsha = hexsha
277 self._sha = hex_to_sha(hexsha)
279 def digest(self) -> bytes:
280 """Return the raw SHA digest."""
281 return self._sha
283 def hexdigest(self) -> str:
284 """Return the hex SHA digest."""
285 return self._hexsha.decode("ascii")
288# Type guard functions for runtime type narrowing
289if TYPE_CHECKING:
291 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
292 """Check if a ShaFile is a Commit."""
293 return obj.type_name == b"commit"
295 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
296 """Check if a ShaFile is a Tree."""
297 return obj.type_name == b"tree"
299 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
300 """Check if a ShaFile is a Blob."""
301 return obj.type_name == b"blob"
303 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
304 """Check if a ShaFile is a Tag."""
305 return obj.type_name == b"tag"
306else:
307 # Runtime versions without type narrowing
308 def is_commit(obj: "ShaFile") -> bool:
309 """Check if a ShaFile is a Commit."""
310 return obj.type_name == b"commit"
312 def is_tree(obj: "ShaFile") -> bool:
313 """Check if a ShaFile is a Tree."""
314 return obj.type_name == b"tree"
316 def is_blob(obj: "ShaFile") -> bool:
317 """Check if a ShaFile is a Blob."""
318 return obj.type_name == b"blob"
320 def is_tag(obj: "ShaFile") -> bool:
321 """Check if a ShaFile is a Tag."""
322 return obj.type_name == b"tag"
325class ShaFile:
326 """A git SHA file."""
328 __slots__ = ("_chunked_text", "_needs_serialization", "_sha")
330 _needs_serialization: bool
331 type_name: bytes
332 type_num: int
333 _chunked_text: Optional[list[bytes]]
334 _sha: Union[FixedSha, None, "HASH"]
336 @staticmethod
337 def _parse_legacy_object_header(
338 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
339 ) -> "ShaFile":
340 """Parse a legacy object, creating it but not reading the file."""
341 bufsize = 1024
342 decomp = zlib.decompressobj()
343 header = decomp.decompress(magic)
344 start = 0
345 end = -1
346 while end < 0:
347 extra = f.read(bufsize)
348 header += decomp.decompress(extra)
349 magic += extra
350 end = header.find(b"\0", start)
351 start = len(header)
352 header = header[:end]
353 type_name, size = header.split(b" ", 1)
354 try:
355 int(size) # sanity check
356 except ValueError as exc:
357 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc
358 obj_class = object_class(type_name)
359 if not obj_class:
360 raise ObjectFormatException(
361 "Not a known type: {}".format(type_name.decode("ascii"))
362 )
363 return obj_class()
365 def _parse_legacy_object(self, map: bytes) -> None:
366 """Parse a legacy object, setting the raw string."""
367 text = _decompress(map)
368 header_end = text.find(b"\0")
369 if header_end < 0:
370 raise ObjectFormatException("Invalid object header, no \\0")
371 self.set_raw_string(text[header_end + 1 :])
373 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
374 """Return chunks representing the object in the experimental format.
376 Returns: List of strings
377 """
378 compobj = zlib.compressobj(compression_level)
379 yield compobj.compress(self._header())
380 for chunk in self.as_raw_chunks():
381 yield compobj.compress(chunk)
382 yield compobj.flush()
384 def as_legacy_object(self, compression_level: int = -1) -> bytes:
385 """Return string representing the object in the experimental format."""
386 return b"".join(
387 self.as_legacy_object_chunks(compression_level=compression_level)
388 )
390 def as_raw_chunks(self) -> list[bytes]:
391 """Return chunks with serialization of the object.
393 Returns: List of strings, not necessarily one per line
394 """
395 if self._needs_serialization:
396 self._sha = None
397 self._chunked_text = self._serialize()
398 self._needs_serialization = False
399 return self._chunked_text # type: ignore
401 def as_raw_string(self) -> bytes:
402 """Return raw string with serialization of the object.
404 Returns: String object
405 """
406 return b"".join(self.as_raw_chunks())
408 def __bytes__(self) -> bytes:
409 """Return raw string serialization of this object."""
410 return self.as_raw_string()
412 def __hash__(self) -> int:
413 """Return unique hash for this object."""
414 return hash(self.id)
416 def as_pretty_string(self) -> str:
417 """Return a string representing this object, fit for display."""
418 return self.as_raw_string().decode("utf-8", "replace")
420 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None:
421 """Set the contents of this object from a serialized string."""
422 if not isinstance(text, bytes):
423 raise TypeError(f"Expected bytes for text, got {text!r}")
424 self.set_raw_chunks([text], sha)
426 def set_raw_chunks(
427 self, chunks: list[bytes], sha: Optional[ObjectID] = None
428 ) -> None:
429 """Set the contents of this object from a list of chunks."""
430 self._chunked_text = chunks
431 self._deserialize(chunks)
432 if sha is None:
433 self._sha = None
434 else:
435 self._sha = FixedSha(sha) # type: ignore
436 self._needs_serialization = False
438 @staticmethod
439 def _parse_object_header(
440 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
441 ) -> "ShaFile":
442 """Parse a new style object, creating it but not reading the file."""
443 num_type = (ord(magic[0:1]) >> 4) & 7
444 obj_class = object_class(num_type)
445 if not obj_class:
446 raise ObjectFormatException(f"Not a known type {num_type}")
447 return obj_class()
449 def _parse_object(self, map: bytes) -> None:
450 """Parse a new style object, setting self._text."""
451 # skip type and size; type must have already been determined, and
452 # we trust zlib to fail if it's otherwise corrupted
453 byte = ord(map[0:1])
454 used = 1
455 while (byte & 0x80) != 0:
456 byte = ord(map[used : used + 1])
457 used += 1
458 raw = map[used:]
459 self.set_raw_string(_decompress(raw))
461 @classmethod
462 def _is_legacy_object(cls, magic: bytes) -> bool:
463 b0 = ord(magic[0:1])
464 b1 = ord(magic[1:2])
465 word = (b0 << 8) + b1
466 return (b0 & 0x8F) == 0x08 and (word % 31) == 0
468 @classmethod
469 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
470 map = f.read()
471 if not map:
472 raise EmptyFileException("Corrupted empty file detected")
474 if cls._is_legacy_object(map):
475 obj = cls._parse_legacy_object_header(map, f)
476 obj._parse_legacy_object(map)
477 else:
478 obj = cls._parse_object_header(map, f)
479 obj._parse_object(map)
480 return obj
482 def __init__(self) -> None:
483 """Don't call this directly."""
484 self._sha = None
485 self._chunked_text = []
486 self._needs_serialization = True
488 def _deserialize(self, chunks: list[bytes]) -> None:
489 raise NotImplementedError(self._deserialize)
491 def _serialize(self) -> list[bytes]:
492 raise NotImplementedError(self._serialize)
494 @classmethod
495 def from_path(cls, path: Union[str, bytes]) -> "ShaFile":
496 """Open a SHA file from disk."""
497 with GitFile(path, "rb") as f:
498 return cls.from_file(f)
500 @classmethod
501 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
502 """Get the contents of a SHA file on disk."""
503 try:
504 obj = cls._parse_file(f)
505 obj._sha = None
506 return obj
507 except (IndexError, ValueError) as exc:
508 raise ObjectFormatException("invalid object header") from exc
510 @staticmethod
511 def from_raw_string(
512 type_num: int, string: bytes, sha: Optional[ObjectID] = None
513 ) -> "ShaFile":
514 """Creates an object of the indicated type from the raw string given.
516 Args:
517 type_num: The numeric type of the object.
518 string: The raw uncompressed contents.
519 sha: Optional known sha for the object
520 """
521 cls = object_class(type_num)
522 if cls is None:
523 raise AssertionError(f"unsupported class type num: {type_num}")
524 obj = cls()
525 obj.set_raw_string(string, sha)
526 return obj
528 @staticmethod
529 def from_raw_chunks(
530 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None
531 ) -> "ShaFile":
532 """Creates an object of the indicated type from the raw chunks given.
534 Args:
535 type_num: The numeric type of the object.
536 chunks: An iterable of the raw uncompressed contents.
537 sha: Optional known sha for the object
538 """
539 cls = object_class(type_num)
540 if cls is None:
541 raise AssertionError(f"unsupported class type num: {type_num}")
542 obj = cls()
543 obj.set_raw_chunks(chunks, sha)
544 return obj
546 @classmethod
547 def from_string(cls, string: bytes) -> Self:
548 """Create a ShaFile from a string."""
549 obj = cls()
550 obj.set_raw_string(string)
551 return obj
553 def _check_has_member(self, member: str, error_msg: str) -> None:
554 """Check that the object has a given member variable.
556 Args:
557 member: the member variable to check for
558 error_msg: the message for an error if the member is missing
559 Raises:
560 ObjectFormatException: with the given error_msg if member is
561 missing or is None
562 """
563 if getattr(self, member, None) is None:
564 raise ObjectFormatException(error_msg)
566 def check(self) -> None:
567 """Check this object for internal consistency.
569 Raises:
570 ObjectFormatException: if the object is malformed in some way
571 ChecksumMismatch: if the object was created with a SHA that does
572 not match its contents
573 """
574 # TODO: if we find that error-checking during object parsing is a
575 # performance bottleneck, those checks should be moved to the class's
576 # check() method during optimization so we can still check the object
577 # when necessary.
578 old_sha = self.id
579 try:
580 self._deserialize(self.as_raw_chunks())
581 self._sha = None
582 new_sha = self.id
583 except Exception as exc:
584 raise ObjectFormatException(exc) from exc
585 if old_sha != new_sha:
586 raise ChecksumMismatch(new_sha, old_sha)
588 def _header(self) -> bytes:
589 return object_header(self.type_num, self.raw_length())
591 def raw_length(self) -> int:
592 """Returns the length of the raw string of this object."""
593 return sum(map(len, self.as_raw_chunks()))
595 def sha(self) -> Union[FixedSha, "HASH"]:
596 """The SHA1 object that is the name of this object."""
597 if self._sha is None or self._needs_serialization:
598 # this is a local because as_raw_chunks() overwrites self._sha
599 new_sha = sha1()
600 new_sha.update(self._header())
601 for chunk in self.as_raw_chunks():
602 new_sha.update(chunk)
603 self._sha = new_sha
604 return self._sha
606 def copy(self) -> "ShaFile":
607 """Create a new copy of this SHA1 object from its raw string."""
608 obj_class = object_class(self.type_num)
609 if obj_class is None:
610 raise AssertionError(f"invalid type num {self.type_num}")
611 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
613 @property
614 def id(self) -> bytes:
615 """The hex SHA of this object."""
616 return self.sha().hexdigest().encode("ascii")
618 def __repr__(self) -> str:
619 return f"<{self.__class__.__name__} {self.id!r}>"
621 def __ne__(self, other: object) -> bool:
622 """Check whether this object does not match the other."""
623 return not isinstance(other, ShaFile) or self.id != other.id
625 def __eq__(self, other: object) -> bool:
626 """Return True if the SHAs of the two objects match."""
627 return isinstance(other, ShaFile) and self.id == other.id
629 def __lt__(self, other: object) -> bool:
630 """Return whether SHA of this object is less than the other."""
631 if not isinstance(other, ShaFile):
632 raise TypeError
633 return self.id < other.id
635 def __le__(self, other: object) -> bool:
636 """Check whether SHA of this object is less than or equal to the other."""
637 if not isinstance(other, ShaFile):
638 raise TypeError
639 return self.id <= other.id
642class Blob(ShaFile):
643 """A Git Blob object."""
645 __slots__ = ()
647 type_name = b"blob"
648 type_num = 3
650 _chunked_text: list[bytes]
652 def __init__(self) -> None:
653 super().__init__()
654 self._chunked_text = []
655 self._needs_serialization = False
657 def _get_data(self) -> bytes:
658 return self.as_raw_string()
660 def _set_data(self, data: bytes) -> None:
661 self.set_raw_string(data)
663 data = property(
664 _get_data, _set_data, doc="The text contained within the blob object."
665 )
667 def _get_chunked(self) -> list[bytes]:
668 return self._chunked_text
670 def _set_chunked(self, chunks: list[bytes]) -> None:
671 self._chunked_text = chunks
673 def _serialize(self) -> list[bytes]:
674 return self._chunked_text
676 def _deserialize(self, chunks: list[bytes]) -> None:
677 self._chunked_text = chunks
679 chunked = property(
680 _get_chunked,
681 _set_chunked,
682 doc="The text in the blob object, as chunks (not necessarily lines)",
683 )
685 @classmethod
686 def from_path(cls, path: Union[str, bytes]) -> "Blob":
687 blob = ShaFile.from_path(path)
688 if not isinstance(blob, cls):
689 raise NotBlobError(path)
690 return blob
692 def check(self) -> None:
693 """Check this object for internal consistency.
695 Raises:
696 ObjectFormatException: if the object is malformed in some way
697 """
698 super().check()
700 def splitlines(self) -> list[bytes]:
701 """Return list of lines in this blob.
703 This preserves the original line endings.
704 """
705 chunks = self.chunked
706 if not chunks:
707 return []
708 if len(chunks) == 1:
709 return chunks[0].splitlines(True)
710 remaining = None
711 ret = []
712 for chunk in chunks:
713 lines = chunk.splitlines(True)
714 if len(lines) > 1:
715 ret.append((remaining or b"") + lines[0])
716 ret.extend(lines[1:-1])
717 remaining = lines[-1]
718 elif len(lines) == 1:
719 if remaining is None:
720 remaining = lines.pop()
721 else:
722 remaining += lines.pop()
723 if remaining is not None:
724 ret.append(remaining)
725 return ret
728def _parse_message(
729 chunks: Iterable[bytes],
730) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]:
731 """Parse a message with a list of fields and a body.
733 Args:
734 chunks: the raw chunks of the tag or commit object.
735 Returns: iterator of tuples of (field, value), one per header line, in the
736 order read from the text, possibly including duplicates. Includes a
737 field named None for the freeform tag/commit text.
738 """
739 f = BytesIO(b"".join(chunks))
740 k = None
741 v = b""
742 eof = False
744 def _strip_last_newline(value: bytes) -> bytes:
745 """Strip the last newline from value."""
746 if value and value.endswith(b"\n"):
747 return value[:-1]
748 return value
750 # Parse the headers
751 #
752 # Headers can contain newlines. The next line is indented with a space.
753 # We store the latest key as 'k', and the accumulated value as 'v'.
754 for line in f:
755 if line.startswith(b" "):
756 # Indented continuation of the previous line
757 v += line[1:]
758 else:
759 if k is not None:
760 # We parsed a new header, return its value
761 yield (k, _strip_last_newline(v))
762 if line == b"\n":
763 # Empty line indicates end of headers
764 break
765 (k, v) = line.split(b" ", 1)
767 else:
768 # We reached end of file before the headers ended. We still need to
769 # return the previous header, then we need to return a None field for
770 # the text.
771 eof = True
772 if k is not None:
773 yield (k, _strip_last_newline(v))
774 yield (None, None)
776 if not eof:
777 # We didn't reach the end of file while parsing headers. We can return
778 # the rest of the file as a message.
779 yield (None, f.read())
781 f.close()
784def _format_message(
785 headers: list[tuple[bytes, bytes]], body: Optional[bytes]
786) -> Iterator[bytes]:
787 for field, value in headers:
788 lines = value.split(b"\n")
789 yield git_line(field, lines[0])
790 for line in lines[1:]:
791 yield b" " + line + b"\n"
792 yield b"\n" # There must be a new line after the headers
793 if body:
794 yield body
797class Tag(ShaFile):
798 """A Git Tag object."""
800 type_name = b"tag"
801 type_num = 4
803 __slots__ = (
804 "_message",
805 "_name",
806 "_object_class",
807 "_object_sha",
808 "_signature",
809 "_tag_time",
810 "_tag_timezone",
811 "_tag_timezone_neg_utc",
812 "_tagger",
813 )
815 _message: Optional[bytes]
816 _name: Optional[bytes]
817 _object_class: Optional[type["ShaFile"]]
818 _object_sha: Optional[bytes]
819 _signature: Optional[bytes]
820 _tag_time: Optional[int]
821 _tag_timezone: Optional[int]
822 _tag_timezone_neg_utc: Optional[bool]
823 _tagger: Optional[bytes]
825 def __init__(self) -> None:
826 super().__init__()
827 self._tagger = None
828 self._tag_time = None
829 self._tag_timezone = None
830 self._tag_timezone_neg_utc = False
831 self._signature: Optional[bytes] = None
833 @classmethod
834 def from_path(cls, filename: Union[str, bytes]) -> "Tag":
835 tag = ShaFile.from_path(filename)
836 if not isinstance(tag, cls):
837 raise NotTagError(filename)
838 return tag
840 def check(self) -> None:
841 """Check this object for internal consistency.
843 Raises:
844 ObjectFormatException: if the object is malformed in some way
845 """
846 super().check()
847 assert self._chunked_text is not None
848 self._check_has_member("_object_sha", "missing object sha")
849 self._check_has_member("_object_class", "missing object type")
850 self._check_has_member("_name", "missing tag name")
852 if not self._name:
853 raise ObjectFormatException("empty tag name")
855 if self._object_sha is None:
856 raise ObjectFormatException("missing object sha")
857 check_hexsha(self._object_sha, "invalid object sha")
859 if self._tagger is not None:
860 check_identity(self._tagger, "invalid tagger")
862 self._check_has_member("_tag_time", "missing tag time")
863 if self._tag_time is None:
864 raise ObjectFormatException("missing tag time")
865 check_time(self._tag_time)
867 last = None
868 for field, _ in _parse_message(self._chunked_text):
869 if field == _OBJECT_HEADER and last is not None:
870 raise ObjectFormatException("unexpected object")
871 elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
872 raise ObjectFormatException("unexpected type")
873 elif field == _TAG_HEADER and last != _TYPE_HEADER:
874 raise ObjectFormatException("unexpected tag name")
875 elif field == _TAGGER_HEADER and last != _TAG_HEADER:
876 raise ObjectFormatException("unexpected tagger")
877 last = field
879 def _serialize(self) -> list[bytes]:
880 headers = []
881 if self._object_sha is None:
882 raise ObjectFormatException("missing object sha")
883 headers.append((_OBJECT_HEADER, self._object_sha))
884 if self._object_class is None:
885 raise ObjectFormatException("missing object class")
886 headers.append((_TYPE_HEADER, self._object_class.type_name))
887 if self._name is None:
888 raise ObjectFormatException("missing tag name")
889 headers.append((_TAG_HEADER, self._name))
890 if self._tagger:
891 if self._tag_time is None:
892 headers.append((_TAGGER_HEADER, self._tagger))
893 else:
894 if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
895 raise ObjectFormatException("missing timezone info")
896 headers.append(
897 (
898 _TAGGER_HEADER,
899 format_time_entry(
900 self._tagger,
901 self._tag_time,
902 (self._tag_timezone, self._tag_timezone_neg_utc),
903 ),
904 )
905 )
907 if self.message is None and self._signature is None:
908 body = None
909 else:
910 body = (self.message or b"") + (self._signature or b"")
911 return list(_format_message(headers, body))
913 def _deserialize(self, chunks: list[bytes]) -> None:
914 """Grab the metadata attached to the tag."""
915 self._tagger = None
916 self._tag_time = None
917 self._tag_timezone = None
918 self._tag_timezone_neg_utc = False
919 for field, value in _parse_message(chunks):
920 if field == _OBJECT_HEADER:
921 self._object_sha = value
922 elif field == _TYPE_HEADER:
923 assert isinstance(value, bytes)
924 obj_class = object_class(value)
925 if not obj_class:
926 raise ObjectFormatException(f"Not a known type: {value!r}")
927 self._object_class = obj_class
928 elif field == _TAG_HEADER:
929 self._name = value
930 elif field == _TAGGER_HEADER:
931 if value is None:
932 raise ObjectFormatException("missing tagger value")
933 (
934 self._tagger,
935 self._tag_time,
936 (self._tag_timezone, self._tag_timezone_neg_utc),
937 ) = parse_time_entry(value)
938 elif field is None:
939 if value is None:
940 self._message = None
941 self._signature = None
942 else:
943 try:
944 sig_idx = value.index(BEGIN_PGP_SIGNATURE)
945 except ValueError:
946 self._message = value
947 self._signature = None
948 else:
949 self._message = value[:sig_idx]
950 self._signature = value[sig_idx:]
951 else:
952 raise ObjectFormatException(
953 f"Unknown field {field.decode('ascii', 'replace')}"
954 )
956 def _get_object(self) -> tuple[type[ShaFile], bytes]:
957 """Get the object pointed to by this tag.
959 Returns: tuple of (object class, sha).
960 """
961 if self._object_class is None or self._object_sha is None:
962 raise ValueError("Tag object is not properly initialized")
963 return (self._object_class, self._object_sha)
965 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
966 (self._object_class, self._object_sha) = value
967 self._needs_serialization = True
969 object = property(_get_object, _set_object)
971 name = serializable_property("name", "The name of this tag")
972 tagger = serializable_property(
973 "tagger", "Returns the name of the person who created this tag"
974 )
975 tag_time = serializable_property(
976 "tag_time",
977 "The creation timestamp of the tag. As the number of seconds since the epoch",
978 )
979 tag_timezone = serializable_property(
980 "tag_timezone", "The timezone that tag_time is in."
981 )
982 message = serializable_property("message", "the message attached to this tag")
984 signature = serializable_property("signature", "Optional detached GPG signature")
986 def sign(self, keyid: Optional[str] = None) -> None:
987 import gpg
989 with gpg.Context(armor=True) as c:
990 if keyid is not None:
991 key = c.get_key(keyid)
992 with gpg.Context(armor=True, signers=[key]) as ctx:
993 self.signature, unused_result = ctx.sign(
994 self.as_raw_string(),
995 mode=gpg.constants.sig.mode.DETACH,
996 )
997 else:
998 self.signature, unused_result = c.sign(
999 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1000 )
1002 def raw_without_sig(self) -> bytes:
1003 """Return raw string serialization without the GPG/SSH signature.
1005 self.signature is a signature for the returned raw byte string serialization.
1006 """
1007 ret = self.as_raw_string()
1008 if self._signature:
1009 ret = ret[: -len(self._signature)]
1010 return ret
1012 def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
1013 """Verify GPG signature for this tag (if it is signed).
1015 Args:
1016 keyids: Optional iterable of trusted keyids for this tag.
1017 If this tag is not signed by any key in keyids verification will
1018 fail. If not specified, this function only verifies that the tag
1019 has a valid signature.
1021 Raises:
1022 gpg.errors.BadSignatures: if GPG signature verification fails
1023 gpg.errors.MissingSignatures: if tag was not signed by a key
1024 specified in keyids
1025 """
1026 if self._signature is None:
1027 return
1029 import gpg
1031 with gpg.Context() as ctx:
1032 data, result = ctx.verify(
1033 self.raw_without_sig(),
1034 signature=self._signature,
1035 )
1036 if keyids:
1037 keys = [ctx.get_key(key) for key in keyids]
1038 for key in keys:
1039 for subkey in keys:
1040 for sig in result.signatures:
1041 if subkey.can_sign and subkey.fpr == sig.fpr:
1042 return
1043 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1046class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])):
1047 """Named tuple encapsulating a single tree entry."""
1049 def in_path(self, path: bytes) -> "TreeEntry":
1050 """Return a copy of this entry with the given path prepended."""
1051 if not isinstance(self.path, bytes):
1052 raise TypeError(f"Expected bytes for path, got {path!r}")
1053 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
1056def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]:
1057 """Parse a tree text.
1059 Args:
1060 text: Serialized text to parse
1061 Returns: iterator of tuples of (name, mode, sha)
1063 Raises:
1064 ObjectFormatException: if the object was malformed in some way
1065 """
1066 count = 0
1067 length = len(text)
1068 while count < length:
1069 mode_end = text.index(b" ", count)
1070 mode_text = text[count:mode_end]
1071 if strict and mode_text.startswith(b"0"):
1072 raise ObjectFormatException(f"Invalid mode {mode_text!r}")
1073 try:
1074 mode = int(mode_text, 8)
1075 except ValueError as exc:
1076 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
1077 name_end = text.index(b"\0", mode_end)
1078 name = text[mode_end + 1 : name_end]
1079 count = name_end + 21
1080 sha = text[name_end + 1 : count]
1081 if len(sha) != 20:
1082 raise ObjectFormatException("Sha has invalid length")
1083 hexsha = sha_to_hex(sha)
1084 yield (name, mode, hexsha)
1087def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]:
1088 """Serialize the items in a tree to a text.
1090 Args:
1091 items: Sorted iterable over (name, mode, sha) tuples
1092 Returns: Serialized tree text as chunks
1093 """
1094 for name, mode, hexsha in items:
1095 yield (
1096 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha)
1097 )
1100def sorted_tree_items(
1101 entries: dict[bytes, tuple[int, bytes]], name_order: bool
1102) -> Iterator[TreeEntry]:
1103 """Iterate over a tree entries dictionary.
1105 Args:
1106 name_order: If True, iterate entries in order of their name. If
1107 False, iterate entries in tree order, that is, treat subtree entries as
1108 having '/' appended.
1109 entries: Dictionary mapping names to (mode, sha) tuples
1110 Returns: Iterator over (name, mode, hexsha)
1111 """
1112 if name_order:
1113 key_func = key_entry_name_order
1114 else:
1115 key_func = key_entry
1116 for name, entry in sorted(entries.items(), key=key_func):
1117 mode, hexsha = entry
1118 # Stricter type checks than normal to mirror checks in the Rust version.
1119 mode = int(mode)
1120 if not isinstance(hexsha, bytes):
1121 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}")
1122 yield TreeEntry(name, mode, hexsha)
1125def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1126 """Sort key for tree entry.
1128 Args:
1129 entry: (name, value) tuple
1130 """
1131 (name, (mode, _sha)) = entry
1132 if stat.S_ISDIR(mode):
1133 name += b"/"
1134 return name
1137def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1138 """Sort key for tree entry in name order."""
1139 return entry[0]
1142def pretty_format_tree_entry(
1143 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8"
1144) -> str:
1145 """Pretty format tree entry.
1147 Args:
1148 name: Name of the directory entry
1149 mode: Mode of entry
1150 hexsha: Hexsha of the referenced object
1151 Returns: string describing the tree entry
1152 """
1153 if mode & stat.S_IFDIR:
1154 kind = "tree"
1155 else:
1156 kind = "blob"
1157 return "{:04o} {} {}\t{}\n".format(
1158 mode,
1159 kind,
1160 hexsha.decode("ascii"),
1161 name.decode(encoding, "replace"),
1162 )
1165class SubmoduleEncountered(Exception):
1166 """A submodule was encountered while resolving a path."""
1168 def __init__(self, path: bytes, sha: ObjectID) -> None:
1169 self.path = path
1170 self.sha = sha
1173class Tree(ShaFile):
1174 """A Git tree object."""
1176 type_name = b"tree"
1177 type_num = 2
1179 __slots__ = "_entries"
1181 def __init__(self) -> None:
1182 super().__init__()
1183 self._entries: dict[bytes, tuple[int, bytes]] = {}
1185 @classmethod
1186 def from_path(cls, filename: Union[str, bytes]) -> "Tree":
1187 tree = ShaFile.from_path(filename)
1188 if not isinstance(tree, cls):
1189 raise NotTreeError(filename)
1190 return tree
1192 def __contains__(self, name: bytes) -> bool:
1193 return name in self._entries
1195 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
1196 return self._entries[name]
1198 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
1199 """Set a tree entry by name.
1201 Args:
1202 name: The name of the entry, as a string.
1203 value: A tuple of (mode, hexsha), where mode is the mode of the
1204 entry as an integral type and hexsha is the hex SHA of the entry as
1205 a string.
1206 """
1207 mode, hexsha = value
1208 self._entries[name] = (mode, hexsha)
1209 self._needs_serialization = True
1211 def __delitem__(self, name: bytes) -> None:
1212 del self._entries[name]
1213 self._needs_serialization = True
1215 def __len__(self) -> int:
1216 return len(self._entries)
1218 def __iter__(self) -> Iterator[bytes]:
1219 return iter(self._entries)
1221 def add(self, name: bytes, mode: int, hexsha: bytes) -> None:
1222 """Add an entry to the tree.
1224 Args:
1225 mode: The mode of the entry as an integral type. Not all
1226 possible modes are supported by git; see check() for details.
1227 name: The name of the entry, as a string.
1228 hexsha: The hex SHA of the entry as a string.
1229 """
1230 self._entries[name] = mode, hexsha
1231 self._needs_serialization = True
1233 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
1234 """Iterate over entries.
1236 Args:
1237 name_order: If True, iterate in name order instead of tree
1238 order.
1239 Returns: Iterator over (name, mode, sha) tuples
1240 """
1241 return sorted_tree_items(self._entries, name_order)
1243 def items(self) -> list[TreeEntry]:
1244 """Return the sorted entries in this tree.
1246 Returns: List with (name, mode, sha) tuples
1247 """
1248 return list(self.iteritems())
1250 def _deserialize(self, chunks: list[bytes]) -> None:
1251 """Grab the entries in the tree."""
1252 try:
1253 parsed_entries = parse_tree(b"".join(chunks))
1254 except ValueError as exc:
1255 raise ObjectFormatException(exc) from exc
1256 # TODO: list comprehension is for efficiency in the common (small)
1257 # case; if memory efficiency in the large case is a concern, use a
1258 # genexp.
1259 self._entries = {n: (m, s) for n, m, s in parsed_entries}
1261 def check(self) -> None:
1262 """Check this object for internal consistency.
1264 Raises:
1265 ObjectFormatException: if the object is malformed in some way
1266 """
1267 super().check()
1268 assert self._chunked_text is not None
1269 last = None
1270 allowed_modes = (
1271 stat.S_IFREG | 0o755,
1272 stat.S_IFREG | 0o644,
1273 stat.S_IFLNK,
1274 stat.S_IFDIR,
1275 S_IFGITLINK,
1276 # TODO: optionally exclude as in git fsck --strict
1277 stat.S_IFREG | 0o664,
1278 )
1279 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True):
1280 check_hexsha(sha, f"invalid sha {sha!r}")
1281 if b"/" in name or name in (b"", b".", b"..", b".git"):
1282 raise ObjectFormatException(
1283 "invalid name {}".format(name.decode("utf-8", "replace"))
1284 )
1286 if mode not in allowed_modes:
1287 raise ObjectFormatException(f"invalid mode {mode:06o}")
1289 entry = (name, (mode, sha))
1290 if last:
1291 if key_entry(last) > key_entry(entry):
1292 raise ObjectFormatException("entries not sorted")
1293 if name == last[0]:
1294 raise ObjectFormatException(f"duplicate entry {name!r}")
1295 last = entry
1297 def _serialize(self) -> list[bytes]:
1298 return list(serialize_tree(self.iteritems()))
1300 def as_pretty_string(self) -> str:
1301 text: list[str] = []
1302 for name, mode, hexsha in self.iteritems():
1303 text.append(pretty_format_tree_entry(name, mode, hexsha))
1304 return "".join(text)
1306 def lookup_path(
1307 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
1308 ) -> tuple[int, ObjectID]:
1309 """Look up an object in a Git tree.
1311 Args:
1312 lookup_obj: Callback for retrieving object by SHA1
1313 path: Path to lookup
1314 Returns: A tuple of (mode, SHA) of the resulting path.
1315 """
1316 parts = path.split(b"/")
1317 sha = self.id
1318 mode: Optional[int] = None
1319 for i, p in enumerate(parts):
1320 if not p:
1321 continue
1322 if mode is not None and S_ISGITLINK(mode):
1323 raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
1324 obj = lookup_obj(sha)
1325 if not isinstance(obj, Tree):
1326 raise NotTreeError(sha)
1327 mode, sha = obj[p]
1328 if mode is None:
1329 raise ValueError("No valid path found")
1330 return mode, sha
1333def parse_timezone(text: bytes) -> tuple[int, bool]:
1334 """Parse a timezone text fragment (e.g. '+0100').
1336 Args:
1337 text: Text to parse.
1338 Returns: Tuple with timezone as seconds difference to UTC
1339 and a boolean indicating whether this was a UTC timezone
1340 prefixed with a negative sign (-0000).
1341 """
1342 # cgit parses the first character as the sign, and the rest
1343 # as an integer (using strtol), which could also be negative.
1344 # We do the same for compatibility. See #697828.
1345 if text[0] not in b"+-":
1346 raise ValueError("Timezone must start with + or - ({text})".format(**vars()))
1347 sign = text[:1]
1348 offset = int(text[1:])
1349 if sign == b"-":
1350 offset = -offset
1351 unnecessary_negative_timezone = offset >= 0 and sign == b"-"
1352 signum = ((offset < 0) and -1) or 1
1353 offset = abs(offset)
1354 hours = int(offset / 100)
1355 minutes = offset % 100
1356 return (
1357 signum * (hours * 3600 + minutes * 60),
1358 unnecessary_negative_timezone,
1359 )
1362def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
1363 """Format a timezone for Git serialization.
1365 Args:
1366 offset: Timezone offset as seconds difference to UTC
1367 unnecessary_negative_timezone: Whether to use a minus sign for
1368 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700).
1369 """
1370 if offset % 60 != 0:
1371 raise ValueError("Unable to handle non-minute offset.")
1372 if offset < 0 or unnecessary_negative_timezone:
1373 sign = "-"
1374 offset = -offset
1375 else:
1376 sign = "+"
1377 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
1380def parse_time_entry(
1381 value: bytes,
1382) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]:
1383 """Parse event.
1385 Args:
1386 value: Bytes representing a git commit/tag line
1387 Raises:
1388 ObjectFormatException in case of parsing error (malformed
1389 field date)
1390 Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
1391 """
1392 try:
1393 sep = value.rindex(b"> ")
1394 except ValueError:
1395 return (value, None, (None, False))
1396 try:
1397 person = value[0 : sep + 1]
1398 rest = value[sep + 2 :]
1399 timetext, timezonetext = rest.rsplit(b" ", 1)
1400 time = int(timetext)
1401 timezone, timezone_neg_utc = parse_timezone(timezonetext)
1402 except ValueError as exc:
1403 raise ObjectFormatException(exc) from exc
1404 return person, time, (timezone, timezone_neg_utc)
1407def format_time_entry(
1408 person: bytes, time: int, timezone_info: tuple[int, bool]
1409) -> bytes:
1410 """Format an event."""
1411 (timezone, timezone_neg_utc) = timezone_info
1412 return b" ".join(
1413 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
1414 )
1417@replace_me(since="0.21.0", remove_in="0.24.0")
1418def parse_commit(
1419 chunks: Iterable[bytes],
1420) -> tuple[
1421 Optional[bytes],
1422 list[bytes],
1423 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
1424 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
1425 Optional[bytes],
1426 list[Tag],
1427 Optional[bytes],
1428 Optional[bytes],
1429 list[tuple[bytes, bytes]],
1430]:
1431 """Parse a commit object from chunks.
1433 Args:
1434 chunks: Chunks to parse
1435 Returns: Tuple of (tree, parents, author_info, commit_info,
1436 encoding, mergetag, gpgsig, message, extra)
1437 """
1438 parents = []
1439 extra = []
1440 tree = None
1441 author_info: tuple[
1442 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1443 ] = (None, None, (None, None))
1444 commit_info: tuple[
1445 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1446 ] = (None, None, (None, None))
1447 encoding = None
1448 mergetag = []
1449 message = None
1450 gpgsig = None
1452 for field, value in _parse_message(chunks):
1453 # TODO(jelmer): Enforce ordering
1454 if field == _TREE_HEADER:
1455 tree = value
1456 elif field == _PARENT_HEADER:
1457 if value is None:
1458 raise ObjectFormatException("missing parent value")
1459 parents.append(value)
1460 elif field == _AUTHOR_HEADER:
1461 if value is None:
1462 raise ObjectFormatException("missing author value")
1463 author_info = parse_time_entry(value)
1464 elif field == _COMMITTER_HEADER:
1465 if value is None:
1466 raise ObjectFormatException("missing committer value")
1467 commit_info = parse_time_entry(value)
1468 elif field == _ENCODING_HEADER:
1469 encoding = value
1470 elif field == _MERGETAG_HEADER:
1471 if value is None:
1472 raise ObjectFormatException("missing mergetag value")
1473 tag = Tag.from_string(value + b"\n")
1474 assert isinstance(tag, Tag)
1475 mergetag.append(tag)
1476 elif field == _GPGSIG_HEADER:
1477 gpgsig = value
1478 elif field is None:
1479 message = value
1480 else:
1481 if value is None:
1482 raise ObjectFormatException(f"missing value for field {field!r}")
1483 extra.append((field, value))
1484 return (
1485 tree,
1486 parents,
1487 author_info,
1488 commit_info,
1489 encoding,
1490 mergetag,
1491 gpgsig,
1492 message,
1493 extra,
1494 )
1497class Commit(ShaFile):
1498 """A git commit object."""
1500 type_name = b"commit"
1501 type_num = 1
1503 __slots__ = (
1504 "_author",
1505 "_author_time",
1506 "_author_timezone",
1507 "_author_timezone_neg_utc",
1508 "_commit_time",
1509 "_commit_timezone",
1510 "_commit_timezone_neg_utc",
1511 "_committer",
1512 "_encoding",
1513 "_extra",
1514 "_gpgsig",
1515 "_mergetag",
1516 "_message",
1517 "_parents",
1518 "_tree",
1519 )
1521 def __init__(self) -> None:
1522 super().__init__()
1523 self._parents: list[bytes] = []
1524 self._encoding: Optional[bytes] = None
1525 self._mergetag: list[Tag] = []
1526 self._gpgsig: Optional[bytes] = None
1527 self._extra: list[tuple[bytes, Optional[bytes]]] = []
1528 self._author_timezone_neg_utc: Optional[bool] = False
1529 self._commit_timezone_neg_utc: Optional[bool] = False
1531 @classmethod
1532 def from_path(cls, path: Union[str, bytes]) -> "Commit":
1533 commit = ShaFile.from_path(path)
1534 if not isinstance(commit, cls):
1535 raise NotCommitError(path)
1536 return commit
1538 def _deserialize(self, chunks: list[bytes]) -> None:
1539 self._parents = []
1540 self._extra = []
1541 self._tree = None
1542 author_info: tuple[
1543 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1544 ] = (None, None, (None, None))
1545 commit_info: tuple[
1546 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1547 ] = (None, None, (None, None))
1548 self._encoding = None
1549 self._mergetag = []
1550 self._message = None
1551 self._gpgsig = None
1553 for field, value in _parse_message(chunks):
1554 # TODO(jelmer): Enforce ordering
1555 if field == _TREE_HEADER:
1556 self._tree = value
1557 elif field == _PARENT_HEADER:
1558 assert value is not None
1559 self._parents.append(value)
1560 elif field == _AUTHOR_HEADER:
1561 if value is None:
1562 raise ObjectFormatException("missing author value")
1563 author_info = parse_time_entry(value)
1564 elif field == _COMMITTER_HEADER:
1565 if value is None:
1566 raise ObjectFormatException("missing committer value")
1567 commit_info = parse_time_entry(value)
1568 elif field == _ENCODING_HEADER:
1569 self._encoding = value
1570 elif field == _MERGETAG_HEADER:
1571 assert value is not None
1572 tag = Tag.from_string(value + b"\n")
1573 assert isinstance(tag, Tag)
1574 self._mergetag.append(tag)
1575 elif field == _GPGSIG_HEADER:
1576 self._gpgsig = value
1577 elif field is None:
1578 self._message = value
1579 else:
1580 self._extra.append((field, value))
1582 (
1583 self._author,
1584 self._author_time,
1585 (self._author_timezone, self._author_timezone_neg_utc),
1586 ) = author_info
1587 (
1588 self._committer,
1589 self._commit_time,
1590 (self._commit_timezone, self._commit_timezone_neg_utc),
1591 ) = commit_info
1593 def check(self) -> None:
1594 """Check this object for internal consistency.
1596 Raises:
1597 ObjectFormatException: if the object is malformed in some way
1598 """
1599 super().check()
1600 assert self._chunked_text is not None
1601 self._check_has_member("_tree", "missing tree")
1602 self._check_has_member("_author", "missing author")
1603 self._check_has_member("_committer", "missing committer")
1604 self._check_has_member("_author_time", "missing author time")
1605 self._check_has_member("_commit_time", "missing commit time")
1607 for parent in self._parents:
1608 check_hexsha(parent, "invalid parent sha")
1609 assert self._tree is not None # checked by _check_has_member above
1610 check_hexsha(self._tree, "invalid tree sha")
1612 assert self._author is not None # checked by _check_has_member above
1613 assert self._committer is not None # checked by _check_has_member above
1614 check_identity(self._author, "invalid author")
1615 check_identity(self._committer, "invalid committer")
1617 assert self._author_time is not None # checked by _check_has_member above
1618 assert self._commit_time is not None # checked by _check_has_member above
1619 check_time(self._author_time)
1620 check_time(self._commit_time)
1622 last = None
1623 for field, _ in _parse_message(self._chunked_text):
1624 if field == _TREE_HEADER and last is not None:
1625 raise ObjectFormatException("unexpected tree")
1626 elif field == _PARENT_HEADER and last not in (
1627 _PARENT_HEADER,
1628 _TREE_HEADER,
1629 ):
1630 raise ObjectFormatException("unexpected parent")
1631 elif field == _AUTHOR_HEADER and last not in (
1632 _TREE_HEADER,
1633 _PARENT_HEADER,
1634 ):
1635 raise ObjectFormatException("unexpected author")
1636 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER:
1637 raise ObjectFormatException("unexpected committer")
1638 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER:
1639 raise ObjectFormatException("unexpected encoding")
1640 last = field
1642 # TODO: optionally check for duplicate parents
1644 def sign(self, keyid: Optional[str] = None) -> None:
1645 import gpg
1647 with gpg.Context(armor=True) as c:
1648 if keyid is not None:
1649 key = c.get_key(keyid)
1650 with gpg.Context(armor=True, signers=[key]) as ctx:
1651 self.gpgsig, unused_result = ctx.sign(
1652 self.as_raw_string(),
1653 mode=gpg.constants.sig.mode.DETACH,
1654 )
1655 else:
1656 self.gpgsig, unused_result = c.sign(
1657 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1658 )
1660 def raw_without_sig(self) -> bytes:
1661 """Return raw string serialization without the GPG/SSH signature.
1663 self.gpgsig is a signature for the returned raw byte string serialization.
1664 """
1665 tmp = self.copy()
1666 assert isinstance(tmp, Commit)
1667 tmp._gpgsig = None
1668 tmp.gpgsig = None
1669 return tmp.as_raw_string()
1671 def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
1672 """Verify GPG signature for this commit (if it is signed).
1674 Args:
1675 keyids: Optional iterable of trusted keyids for this commit.
1676 If this commit is not signed by any key in keyids verification will
1677 fail. If not specified, this function only verifies that the commit
1678 has a valid signature.
1680 Raises:
1681 gpg.errors.BadSignatures: if GPG signature verification fails
1682 gpg.errors.MissingSignatures: if commit was not signed by a key
1683 specified in keyids
1684 """
1685 if self._gpgsig is None:
1686 return
1688 import gpg
1690 with gpg.Context() as ctx:
1691 data, result = ctx.verify(
1692 self.raw_without_sig(),
1693 signature=self._gpgsig,
1694 )
1695 if keyids:
1696 keys = [ctx.get_key(key) for key in keyids]
1697 for key in keys:
1698 for subkey in keys:
1699 for sig in result.signatures:
1700 if subkey.can_sign and subkey.fpr == sig.fpr:
1701 return
1702 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1704 def _serialize(self) -> list[bytes]:
1705 headers = []
1706 assert self._tree is not None
1707 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
1708 headers.append((_TREE_HEADER, tree_bytes))
1709 for p in self._parents:
1710 headers.append((_PARENT_HEADER, p))
1711 assert self._author is not None
1712 assert self._author_time is not None
1713 assert self._author_timezone is not None
1714 assert self._author_timezone_neg_utc is not None
1715 headers.append(
1716 (
1717 _AUTHOR_HEADER,
1718 format_time_entry(
1719 self._author,
1720 self._author_time,
1721 (self._author_timezone, self._author_timezone_neg_utc),
1722 ),
1723 )
1724 )
1725 assert self._committer is not None
1726 assert self._commit_time is not None
1727 assert self._commit_timezone is not None
1728 assert self._commit_timezone_neg_utc is not None
1729 headers.append(
1730 (
1731 _COMMITTER_HEADER,
1732 format_time_entry(
1733 self._committer,
1734 self._commit_time,
1735 (self._commit_timezone, self._commit_timezone_neg_utc),
1736 ),
1737 )
1738 )
1739 if self.encoding:
1740 headers.append((_ENCODING_HEADER, self.encoding))
1741 for mergetag in self.mergetag:
1742 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
1743 headers.extend(
1744 (field, value) for field, value in self._extra if value is not None
1745 )
1746 if self.gpgsig:
1747 headers.append((_GPGSIG_HEADER, self.gpgsig))
1748 return list(_format_message(headers, self._message))
1750 tree = serializable_property("tree", "Tree that is the state of this commit")
1752 def _get_parents(self) -> list[bytes]:
1753 """Return a list of parents of this commit."""
1754 return self._parents
1756 def _set_parents(self, value: list[bytes]) -> None:
1757 """Set a list of parents of this commit."""
1758 self._needs_serialization = True
1759 self._parents = value
1761 parents = property(
1762 _get_parents,
1763 _set_parents,
1764 doc="Parents of this commit, by their SHA1.",
1765 )
1767 @replace_me(since="0.21.0", remove_in="0.24.0")
1768 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]:
1769 """Return extra settings of this commit."""
1770 return self._extra
1772 extra = property(
1773 _get_extra,
1774 doc="Extra header fields not understood (presumably added in a "
1775 "newer version of git). Kept verbatim so the object can "
1776 "be correctly reserialized. For private commit metadata, use "
1777 "pseudo-headers in Commit.message, rather than this field.",
1778 )
1780 author = serializable_property("author", "The name of the author of the commit")
1782 committer = serializable_property(
1783 "committer", "The name of the committer of the commit"
1784 )
1786 message = serializable_property("message", "The commit message")
1788 commit_time = serializable_property(
1789 "commit_time",
1790 "The timestamp of the commit. As the number of seconds since the epoch.",
1791 )
1793 commit_timezone = serializable_property(
1794 "commit_timezone", "The zone the commit time is in"
1795 )
1797 author_time = serializable_property(
1798 "author_time",
1799 "The timestamp the commit was written. As the number of "
1800 "seconds since the epoch.",
1801 )
1803 author_timezone = serializable_property(
1804 "author_timezone", "Returns the zone the author time is in."
1805 )
1807 encoding = serializable_property("encoding", "Encoding of the commit message.")
1809 mergetag = serializable_property("mergetag", "Associated signed tag.")
1811 gpgsig = serializable_property("gpgsig", "GPG Signature.")
1814OBJECT_CLASSES = (
1815 Commit,
1816 Tree,
1817 Blob,
1818 Tag,
1819)
1821_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {}
1823for cls in OBJECT_CLASSES:
1824 _TYPE_MAP[cls.type_name] = cls
1825 _TYPE_MAP[cls.type_num] = cls
1828# Hold on to the pure-python implementations for testing
1829_parse_tree_py = parse_tree
1830_sorted_tree_items_py = sorted_tree_items
1831try:
1832 # Try to import Rust versions
1833 from dulwich._objects import (
1834 parse_tree as _parse_tree_rs,
1835 )
1836 from dulwich._objects import (
1837 sorted_tree_items as _sorted_tree_items_rs,
1838 )
1839except ImportError:
1840 pass
1841else:
1842 parse_tree = _parse_tree_rs
1843 sorted_tree_items = _sorted_tree_items_rs