Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# objects.py -- Access to base git objects
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Access to base git objects."""
25import binascii
26import os
27import posixpath
28import stat
29import sys
30import zlib
31from collections.abc import Callable, Iterable, Iterator, Sequence
32from hashlib import sha1
33from io import BufferedIOBase, BytesIO
34from typing import (
35 IO,
36 TYPE_CHECKING,
37 NamedTuple,
38 Optional,
39 TypeVar,
40 Union,
41)
43if sys.version_info >= (3, 11):
44 from typing import Self
45else:
46 from typing_extensions import Self
48if sys.version_info >= (3, 10):
49 from typing import TypeGuard
50else:
51 from typing_extensions import TypeGuard
53from . import replace_me
54from .errors import (
55 ChecksumMismatch,
56 FileFormatException,
57 NotBlobError,
58 NotCommitError,
59 NotTagError,
60 NotTreeError,
61 ObjectFormatException,
62)
63from .file import GitFile
65if TYPE_CHECKING:
66 from _hashlib import HASH
68 from .file import _GitFile
70ZERO_SHA = b"0" * 40
72# Header fields for commits
73_TREE_HEADER = b"tree"
74_PARENT_HEADER = b"parent"
75_AUTHOR_HEADER = b"author"
76_COMMITTER_HEADER = b"committer"
77_ENCODING_HEADER = b"encoding"
78_MERGETAG_HEADER = b"mergetag"
79_GPGSIG_HEADER = b"gpgsig"
81# Header fields for objects
82_OBJECT_HEADER = b"object"
83_TYPE_HEADER = b"type"
84_TAG_HEADER = b"tag"
85_TAGGER_HEADER = b"tagger"
88S_IFGITLINK = 0o160000
91MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
93BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
94BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----"
96# Signature type constants
97SIGNATURE_PGP = b"pgp"
98SIGNATURE_SSH = b"ssh"
101ObjectID = bytes
104class EmptyFileException(FileFormatException):
105 """An unexpectedly empty file was encountered."""
108def S_ISGITLINK(m: int) -> bool:
109 """Check if a mode indicates a submodule.
111 Args:
112 m: Mode to check
113 Returns: a ``boolean``
114 """
115 return stat.S_IFMT(m) == S_IFGITLINK
118def _decompress(string: bytes) -> bytes:
119 dcomp = zlib.decompressobj()
120 dcomped = dcomp.decompress(string)
121 dcomped += dcomp.flush()
122 return dcomped
125def sha_to_hex(sha: ObjectID) -> bytes:
126 """Takes a string and returns the hex of the sha within."""
127 hexsha = binascii.hexlify(sha)
128 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}"
129 return hexsha
132def hex_to_sha(hex: Union[bytes, str]) -> bytes:
133 """Takes a hex sha and returns a binary sha."""
134 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}"
135 try:
136 return binascii.unhexlify(hex)
137 except TypeError as exc:
138 if not isinstance(hex, bytes):
139 raise
140 raise ValueError(exc.args[0]) from exc
143def valid_hexsha(hex: Union[bytes, str]) -> bool:
144 """Check if a string is a valid hex SHA.
146 Args:
147 hex: Hex string to check
149 Returns:
150 True if valid hex SHA, False otherwise
151 """
152 if len(hex) != 40:
153 return False
154 try:
155 binascii.unhexlify(hex)
156 except (TypeError, binascii.Error):
157 return False
158 else:
159 return True
162PathT = TypeVar("PathT", str, bytes)
165def hex_to_filename(path: PathT, hex: Union[str, bytes]) -> PathT:
166 """Takes a hex sha and returns its filename relative to the given path."""
167 # os.path.join accepts bytes or unicode, but all args must be of the same
168 # type. Make sure that hex which is expected to be bytes, is the same type
169 # as path.
170 if isinstance(path, str):
171 if isinstance(hex, bytes):
172 hex_str = hex.decode("ascii")
173 else:
174 hex_str = hex
175 dir_name = hex_str[:2]
176 file_name = hex_str[2:]
177 result = os.path.join(path, dir_name, file_name)
178 assert isinstance(result, str)
179 return result
180 else:
181 # path is bytes
182 if isinstance(hex, str):
183 hex_bytes = hex.encode("ascii")
184 else:
185 hex_bytes = hex
186 dir_name_b = hex_bytes[:2]
187 file_name_b = hex_bytes[2:]
188 result_b = os.path.join(path, dir_name_b, file_name_b)
189 assert isinstance(result_b, bytes)
190 return result_b
193def filename_to_hex(filename: Union[str, bytes]) -> str:
194 """Takes an object filename and returns its corresponding hex sha."""
195 # grab the last (up to) two path components
196 errmsg = f"Invalid object filename: {filename!r}"
197 if isinstance(filename, str):
198 names = filename.rsplit(os.path.sep, 2)[-2:]
199 assert len(names) == 2, errmsg
200 base, rest = names
201 assert len(base) == 2 and len(rest) == 38, errmsg
202 hex_str = base + rest
203 hex_bytes = hex_str.encode("ascii")
204 else:
205 # filename is bytes
206 sep = (
207 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep
208 )
209 names_b = filename.rsplit(sep, 2)[-2:]
210 assert len(names_b) == 2, errmsg
211 base_b, rest_b = names_b
212 assert len(base_b) == 2 and len(rest_b) == 38, errmsg
213 hex_bytes = base_b + rest_b
214 hex_to_sha(hex_bytes)
215 return hex_bytes.decode("ascii")
218def object_header(num_type: int, length: int) -> bytes:
219 """Return an object header for the given numeric type and text length."""
220 cls = object_class(num_type)
221 if cls is None:
222 raise AssertionError(f"unsupported class type num: {num_type}")
223 return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
226def serializable_property(name: str, docstring: Optional[str] = None) -> property:
227 """A property that helps tracking whether serialization is necessary."""
229 def set(obj: "ShaFile", value: object) -> None:
230 """Set the property value and mark the object as needing serialization.
232 Args:
233 obj: The ShaFile object
234 value: The value to set
235 """
236 setattr(obj, "_" + name, value)
237 obj._needs_serialization = True
239 def get(obj: "ShaFile") -> object:
240 """Get the property value.
242 Args:
243 obj: The ShaFile object
245 Returns:
246 The property value
247 """
248 return getattr(obj, "_" + name)
250 return property(get, set, doc=docstring)
253def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]:
254 """Get the object class corresponding to the given type.
256 Args:
257 type: Either a type name string or a numeric type.
258 Returns: The ShaFile subclass corresponding to the given type, or None if
259 type is not a valid type name/number.
260 """
261 return _TYPE_MAP.get(type, None)
264def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None:
265 """Check if a string is a valid hex sha string.
267 Args:
268 hex: Hex string to check
269 error_msg: Error message to use in exception
270 Raises:
271 ObjectFormatException: Raised when the string is not valid
272 """
273 if not valid_hexsha(hex):
274 raise ObjectFormatException(f"{error_msg} {hex!r}")
277def check_identity(identity: Optional[bytes], error_msg: str) -> None:
278 """Check if the specified identity is valid.
280 This will raise an exception if the identity is not valid.
282 Args:
283 identity: Identity string
284 error_msg: Error message to use in exception
285 """
286 if identity is None:
287 raise ObjectFormatException(error_msg)
288 email_start = identity.find(b"<")
289 email_end = identity.find(b">")
290 if not all(
291 [
292 email_start >= 1,
293 identity[email_start - 1] == b" "[0],
294 identity.find(b"<", email_start + 1) == -1,
295 email_end == len(identity) - 1,
296 b"\0" not in identity,
297 b"\n" not in identity,
298 ]
299 ):
300 raise ObjectFormatException(error_msg)
303def _path_to_bytes(path: Union[str, bytes]) -> bytes:
304 """Convert a path to bytes for use in error messages."""
305 if isinstance(path, str):
306 return path.encode("utf-8", "surrogateescape")
307 return path
310def check_time(time_seconds: int) -> None:
311 """Check if the specified time is not prone to overflow error.
313 This will raise an exception if the time is not valid.
315 Args:
316 time_seconds: time in seconds
318 """
319 # Prevent overflow error
320 if time_seconds > MAX_TIME:
321 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
324def git_line(*items: bytes) -> bytes:
325 """Formats items into a space separated line."""
326 return b" ".join(items) + b"\n"
329class FixedSha:
330 """SHA object that behaves like hashlib's but is given a fixed value."""
332 __slots__ = ("_hexsha", "_sha")
334 def __init__(self, hexsha: Union[str, bytes]) -> None:
335 """Initialize FixedSha with a fixed SHA value.
337 Args:
338 hexsha: Hex SHA value as string or bytes
339 """
340 if isinstance(hexsha, str):
341 hexsha = hexsha.encode("ascii")
342 if not isinstance(hexsha, bytes):
343 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
344 self._hexsha = hexsha
345 self._sha = hex_to_sha(hexsha)
347 def digest(self) -> bytes:
348 """Return the raw SHA digest."""
349 return self._sha
351 def hexdigest(self) -> str:
352 """Return the hex SHA digest."""
353 return self._hexsha.decode("ascii")
356# Type guard functions for runtime type narrowing
357if TYPE_CHECKING:
359 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
360 """Check if a ShaFile is a Commit."""
361 return obj.type_name == b"commit"
363 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
364 """Check if a ShaFile is a Tree."""
365 return obj.type_name == b"tree"
367 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
368 """Check if a ShaFile is a Blob."""
369 return obj.type_name == b"blob"
371 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
372 """Check if a ShaFile is a Tag."""
373 return obj.type_name == b"tag"
374else:
375 # Runtime versions without type narrowing
376 def is_commit(obj: "ShaFile") -> bool:
377 """Check if a ShaFile is a Commit."""
378 return obj.type_name == b"commit"
380 def is_tree(obj: "ShaFile") -> bool:
381 """Check if a ShaFile is a Tree."""
382 return obj.type_name == b"tree"
384 def is_blob(obj: "ShaFile") -> bool:
385 """Check if a ShaFile is a Blob."""
386 return obj.type_name == b"blob"
388 def is_tag(obj: "ShaFile") -> bool:
389 """Check if a ShaFile is a Tag."""
390 return obj.type_name == b"tag"
393class ShaFile:
394 """A git SHA file."""
396 __slots__ = ("_chunked_text", "_needs_serialization", "_sha")
398 _needs_serialization: bool
399 type_name: bytes
400 type_num: int
401 _chunked_text: Optional[list[bytes]]
402 _sha: Union[FixedSha, None, "HASH"]
404 @staticmethod
405 def _parse_legacy_object_header(
406 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
407 ) -> "ShaFile":
408 """Parse a legacy object, creating it but not reading the file."""
409 bufsize = 1024
410 decomp = zlib.decompressobj()
411 header = decomp.decompress(magic)
412 start = 0
413 end = -1
414 while end < 0:
415 extra = f.read(bufsize)
416 header += decomp.decompress(extra)
417 magic += extra
418 end = header.find(b"\0", start)
419 start = len(header)
420 header = header[:end]
421 type_name, size = header.split(b" ", 1)
422 try:
423 int(size) # sanity check
424 except ValueError as exc:
425 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc
426 obj_class = object_class(type_name)
427 if not obj_class:
428 raise ObjectFormatException(
429 "Not a known type: {}".format(type_name.decode("ascii"))
430 )
431 return obj_class()
433 def _parse_legacy_object(self, map: bytes) -> None:
434 """Parse a legacy object, setting the raw string."""
435 text = _decompress(map)
436 header_end = text.find(b"\0")
437 if header_end < 0:
438 raise ObjectFormatException("Invalid object header, no \\0")
439 self.set_raw_string(text[header_end + 1 :])
441 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
442 """Return chunks representing the object in the experimental format.
444 Returns: List of strings
445 """
446 compobj = zlib.compressobj(compression_level)
447 yield compobj.compress(self._header())
448 for chunk in self.as_raw_chunks():
449 yield compobj.compress(chunk)
450 yield compobj.flush()
452 def as_legacy_object(self, compression_level: int = -1) -> bytes:
453 """Return string representing the object in the experimental format."""
454 return b"".join(
455 self.as_legacy_object_chunks(compression_level=compression_level)
456 )
458 def as_raw_chunks(self) -> list[bytes]:
459 """Return chunks with serialization of the object.
461 Returns: List of strings, not necessarily one per line
462 """
463 if self._needs_serialization:
464 self._sha = None
465 self._chunked_text = self._serialize()
466 self._needs_serialization = False
467 assert self._chunked_text is not None
468 return self._chunked_text
470 def as_raw_string(self) -> bytes:
471 """Return raw string with serialization of the object.
473 Returns: String object
474 """
475 return b"".join(self.as_raw_chunks())
477 def __bytes__(self) -> bytes:
478 """Return raw string serialization of this object."""
479 return self.as_raw_string()
481 def __hash__(self) -> int:
482 """Return unique hash for this object."""
483 return hash(self.id)
485 def as_pretty_string(self) -> str:
486 """Return a string representing this object, fit for display."""
487 return self.as_raw_string().decode("utf-8", "replace")
489 def set_raw_string(self, text: bytes, sha: Optional[ObjectID] = None) -> None:
490 """Set the contents of this object from a serialized string."""
491 if not isinstance(text, bytes):
492 raise TypeError(f"Expected bytes for text, got {text!r}")
493 self.set_raw_chunks([text], sha)
495 def set_raw_chunks(
496 self, chunks: list[bytes], sha: Optional[ObjectID] = None
497 ) -> None:
498 """Set the contents of this object from a list of chunks."""
499 self._chunked_text = chunks
500 self._deserialize(chunks)
501 if sha is None:
502 self._sha = None
503 else:
504 self._sha = FixedSha(sha)
505 self._needs_serialization = False
507 @staticmethod
508 def _parse_object_header(
509 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
510 ) -> "ShaFile":
511 """Parse a new style object, creating it but not reading the file."""
512 num_type = (ord(magic[0:1]) >> 4) & 7
513 obj_class = object_class(num_type)
514 if not obj_class:
515 raise ObjectFormatException(f"Not a known type {num_type}")
516 return obj_class()
518 def _parse_object(self, map: bytes) -> None:
519 """Parse a new style object, setting self._text."""
520 # skip type and size; type must have already been determined, and
521 # we trust zlib to fail if it's otherwise corrupted
522 byte = ord(map[0:1])
523 used = 1
524 while (byte & 0x80) != 0:
525 byte = ord(map[used : used + 1])
526 used += 1
527 raw = map[used:]
528 self.set_raw_string(_decompress(raw))
530 @classmethod
531 def _is_legacy_object(cls, magic: bytes) -> bool:
532 b0 = ord(magic[0:1])
533 b1 = ord(magic[1:2])
534 word = (b0 << 8) + b1
535 return (b0 & 0x8F) == 0x08 and (word % 31) == 0
537 @classmethod
538 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
539 map = f.read()
540 if not map:
541 raise EmptyFileException("Corrupted empty file detected")
543 if cls._is_legacy_object(map):
544 obj = cls._parse_legacy_object_header(map, f)
545 obj._parse_legacy_object(map)
546 else:
547 obj = cls._parse_object_header(map, f)
548 obj._parse_object(map)
549 return obj
551 def __init__(self) -> None:
552 """Don't call this directly."""
553 self._sha = None
554 self._chunked_text = []
555 self._needs_serialization = True
557 def _deserialize(self, chunks: list[bytes]) -> None:
558 raise NotImplementedError(self._deserialize)
560 def _serialize(self) -> list[bytes]:
561 raise NotImplementedError(self._serialize)
563 @classmethod
564 def from_path(cls, path: Union[str, bytes]) -> "ShaFile":
565 """Open a SHA file from disk."""
566 with GitFile(path, "rb") as f:
567 return cls.from_file(f)
569 @classmethod
570 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
571 """Get the contents of a SHA file on disk."""
572 try:
573 obj = cls._parse_file(f)
574 obj._sha = None
575 return obj
576 except (IndexError, ValueError) as exc:
577 raise ObjectFormatException("invalid object header") from exc
579 @staticmethod
580 def from_raw_string(
581 type_num: int, string: bytes, sha: Optional[ObjectID] = None
582 ) -> "ShaFile":
583 """Creates an object of the indicated type from the raw string given.
585 Args:
586 type_num: The numeric type of the object.
587 string: The raw uncompressed contents.
588 sha: Optional known sha for the object
589 """
590 cls = object_class(type_num)
591 if cls is None:
592 raise AssertionError(f"unsupported class type num: {type_num}")
593 obj = cls()
594 obj.set_raw_string(string, sha)
595 return obj
597 @staticmethod
598 def from_raw_chunks(
599 type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None
600 ) -> "ShaFile":
601 """Creates an object of the indicated type from the raw chunks given.
603 Args:
604 type_num: The numeric type of the object.
605 chunks: An iterable of the raw uncompressed contents.
606 sha: Optional known sha for the object
607 """
608 cls = object_class(type_num)
609 if cls is None:
610 raise AssertionError(f"unsupported class type num: {type_num}")
611 obj = cls()
612 obj.set_raw_chunks(chunks, sha)
613 return obj
615 @classmethod
616 def from_string(cls, string: bytes) -> Self:
617 """Create a ShaFile from a string."""
618 obj = cls()
619 obj.set_raw_string(string)
620 return obj
622 def _check_has_member(self, member: str, error_msg: str) -> None:
623 """Check that the object has a given member variable.
625 Args:
626 member: the member variable to check for
627 error_msg: the message for an error if the member is missing
628 Raises:
629 ObjectFormatException: with the given error_msg if member is
630 missing or is None
631 """
632 if getattr(self, member, None) is None:
633 raise ObjectFormatException(error_msg)
635 def check(self) -> None:
636 """Check this object for internal consistency.
638 Raises:
639 ObjectFormatException: if the object is malformed in some way
640 ChecksumMismatch: if the object was created with a SHA that does
641 not match its contents
642 """
643 # TODO: if we find that error-checking during object parsing is a
644 # performance bottleneck, those checks should be moved to the class's
645 # check() method during optimization so we can still check the object
646 # when necessary.
647 old_sha = self.id
648 try:
649 self._deserialize(self.as_raw_chunks())
650 self._sha = None
651 new_sha = self.id
652 except Exception as exc:
653 raise ObjectFormatException(exc) from exc
654 if old_sha != new_sha:
655 raise ChecksumMismatch(new_sha, old_sha)
657 def _header(self) -> bytes:
658 return object_header(self.type_num, self.raw_length())
660 def raw_length(self) -> int:
661 """Returns the length of the raw string of this object."""
662 return sum(map(len, self.as_raw_chunks()))
664 def sha(self) -> Union[FixedSha, "HASH"]:
665 """The SHA1 object that is the name of this object."""
666 if self._sha is None or self._needs_serialization:
667 # this is a local because as_raw_chunks() overwrites self._sha
668 new_sha = sha1()
669 new_sha.update(self._header())
670 for chunk in self.as_raw_chunks():
671 new_sha.update(chunk)
672 self._sha = new_sha
673 return self._sha
675 def copy(self) -> "ShaFile":
676 """Create a new copy of this SHA1 object from its raw string."""
677 obj_class = object_class(self.type_num)
678 if obj_class is None:
679 raise AssertionError(f"invalid type num {self.type_num}")
680 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
682 @property
683 def id(self) -> bytes:
684 """The hex SHA of this object."""
685 return self.sha().hexdigest().encode("ascii")
687 def __repr__(self) -> str:
688 """Return string representation of this object."""
689 return f"<{self.__class__.__name__} {self.id!r}>"
691 def __ne__(self, other: object) -> bool:
692 """Check whether this object does not match the other."""
693 return not isinstance(other, ShaFile) or self.id != other.id
695 def __eq__(self, other: object) -> bool:
696 """Return True if the SHAs of the two objects match."""
697 return isinstance(other, ShaFile) and self.id == other.id
699 def __lt__(self, other: object) -> bool:
700 """Return whether SHA of this object is less than the other."""
701 if not isinstance(other, ShaFile):
702 raise TypeError
703 return self.id < other.id
705 def __le__(self, other: object) -> bool:
706 """Check whether SHA of this object is less than or equal to the other."""
707 if not isinstance(other, ShaFile):
708 raise TypeError
709 return self.id <= other.id
712class Blob(ShaFile):
713 """A Git Blob object."""
715 __slots__ = ()
717 type_name = b"blob"
718 type_num = 3
720 _chunked_text: list[bytes]
722 def __init__(self) -> None:
723 """Initialize a new Blob object."""
724 super().__init__()
725 self._chunked_text = []
726 self._needs_serialization = False
728 def _get_data(self) -> bytes:
729 return self.as_raw_string()
731 def _set_data(self, data: bytes) -> None:
732 self.set_raw_string(data)
734 data = property(
735 _get_data, _set_data, doc="The text contained within the blob object."
736 )
738 def _get_chunked(self) -> list[bytes]:
739 return self._chunked_text
741 def _set_chunked(self, chunks: list[bytes]) -> None:
742 self._chunked_text = chunks
744 def _serialize(self) -> list[bytes]:
745 return self._chunked_text
747 def _deserialize(self, chunks: list[bytes]) -> None:
748 self._chunked_text = chunks
750 chunked = property(
751 _get_chunked,
752 _set_chunked,
753 doc="The text in the blob object, as chunks (not necessarily lines)",
754 )
756 @classmethod
757 def from_path(cls, path: Union[str, bytes]) -> "Blob":
758 """Read a blob from a file on disk.
760 Args:
761 path: Path to the blob file
763 Returns:
764 A Blob object
766 Raises:
767 NotBlobError: If the file is not a blob
768 """
769 blob = ShaFile.from_path(path)
770 if not isinstance(blob, cls):
771 raise NotBlobError(_path_to_bytes(path))
772 return blob
774 def check(self) -> None:
775 """Check this object for internal consistency.
777 Raises:
778 ObjectFormatException: if the object is malformed in some way
779 """
780 super().check()
782 def splitlines(self) -> list[bytes]:
783 """Return list of lines in this blob.
785 This preserves the original line endings.
786 """
787 chunks = self.chunked
788 if not chunks:
789 return []
790 if len(chunks) == 1:
791 result: list[bytes] = chunks[0].splitlines(True)
792 return result
793 remaining = None
794 ret = []
795 for chunk in chunks:
796 lines = chunk.splitlines(True)
797 if len(lines) > 1:
798 ret.append((remaining or b"") + lines[0])
799 ret.extend(lines[1:-1])
800 remaining = lines[-1]
801 elif len(lines) == 1:
802 if remaining is None:
803 remaining = lines.pop()
804 else:
805 remaining += lines.pop()
806 if remaining is not None:
807 ret.append(remaining)
808 return ret
811def _parse_message(
812 chunks: Iterable[bytes],
813) -> Iterator[Union[tuple[None, None], tuple[Optional[bytes], bytes]]]:
814 """Parse a message with a list of fields and a body.
816 Args:
817 chunks: the raw chunks of the tag or commit object.
818 Returns: iterator of tuples of (field, value), one per header line, in the
819 order read from the text, possibly including duplicates. Includes a
820 field named None for the freeform tag/commit text.
821 """
822 f = BytesIO(b"".join(chunks))
823 k = None
824 v = b""
825 eof = False
827 def _strip_last_newline(value: bytes) -> bytes:
828 """Strip the last newline from value."""
829 if value and value.endswith(b"\n"):
830 return value[:-1]
831 return value
833 # Parse the headers
834 #
835 # Headers can contain newlines. The next line is indented with a space.
836 # We store the latest key as 'k', and the accumulated value as 'v'.
837 for line in f:
838 if line.startswith(b" "):
839 # Indented continuation of the previous line
840 v += line[1:]
841 else:
842 if k is not None:
843 # We parsed a new header, return its value
844 yield (k, _strip_last_newline(v))
845 if line == b"\n":
846 # Empty line indicates end of headers
847 break
848 (k, v) = line.split(b" ", 1)
850 else:
851 # We reached end of file before the headers ended. We still need to
852 # return the previous header, then we need to return a None field for
853 # the text.
854 eof = True
855 if k is not None:
856 yield (k, _strip_last_newline(v))
857 yield (None, None)
859 if not eof:
860 # We didn't reach the end of file while parsing headers. We can return
861 # the rest of the file as a message.
862 yield (None, f.read())
864 f.close()
867def _format_message(
868 headers: Sequence[tuple[bytes, bytes]], body: Optional[bytes]
869) -> Iterator[bytes]:
870 for field, value in headers:
871 lines = value.split(b"\n")
872 yield git_line(field, lines[0])
873 for line in lines[1:]:
874 yield b" " + line + b"\n"
875 yield b"\n" # There must be a new line after the headers
876 if body:
877 yield body
880class Tag(ShaFile):
881 """A Git Tag object."""
883 type_name = b"tag"
884 type_num = 4
886 __slots__ = (
887 "_message",
888 "_name",
889 "_object_class",
890 "_object_sha",
891 "_signature",
892 "_tag_time",
893 "_tag_timezone",
894 "_tag_timezone_neg_utc",
895 "_tagger",
896 )
898 _message: Optional[bytes]
899 _name: Optional[bytes]
900 _object_class: Optional[type["ShaFile"]]
901 _object_sha: Optional[bytes]
902 _signature: Optional[bytes]
903 _tag_time: Optional[int]
904 _tag_timezone: Optional[int]
905 _tag_timezone_neg_utc: Optional[bool]
906 _tagger: Optional[bytes]
908 def __init__(self) -> None:
909 """Initialize a new Tag object."""
910 super().__init__()
911 self._tagger = None
912 self._tag_time = None
913 self._tag_timezone = None
914 self._tag_timezone_neg_utc = False
915 self._signature: Optional[bytes] = None
917 @classmethod
918 def from_path(cls, filename: Union[str, bytes]) -> "Tag":
919 """Read a tag from a file on disk.
921 Args:
922 filename: Path to the tag file
924 Returns:
925 A Tag object
927 Raises:
928 NotTagError: If the file is not a tag
929 """
930 tag = ShaFile.from_path(filename)
931 if not isinstance(tag, cls):
932 raise NotTagError(_path_to_bytes(filename))
933 return tag
935 def check(self) -> None:
936 """Check this object for internal consistency.
938 Raises:
939 ObjectFormatException: if the object is malformed in some way
940 """
941 super().check()
942 assert self._chunked_text is not None
943 self._check_has_member("_object_sha", "missing object sha")
944 self._check_has_member("_object_class", "missing object type")
945 self._check_has_member("_name", "missing tag name")
947 if not self._name:
948 raise ObjectFormatException("empty tag name")
950 if self._object_sha is None:
951 raise ObjectFormatException("missing object sha")
952 check_hexsha(self._object_sha, "invalid object sha")
954 if self._tagger is not None:
955 check_identity(self._tagger, "invalid tagger")
957 self._check_has_member("_tag_time", "missing tag time")
958 if self._tag_time is None:
959 raise ObjectFormatException("missing tag time")
960 check_time(self._tag_time)
962 last = None
963 for field, _ in _parse_message(self._chunked_text):
964 if field == _OBJECT_HEADER and last is not None:
965 raise ObjectFormatException("unexpected object")
966 elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
967 raise ObjectFormatException("unexpected type")
968 elif field == _TAG_HEADER and last != _TYPE_HEADER:
969 raise ObjectFormatException("unexpected tag name")
970 elif field == _TAGGER_HEADER and last != _TAG_HEADER:
971 raise ObjectFormatException("unexpected tagger")
972 last = field
974 def _serialize(self) -> list[bytes]:
975 headers = []
976 if self._object_sha is None:
977 raise ObjectFormatException("missing object sha")
978 headers.append((_OBJECT_HEADER, self._object_sha))
979 if self._object_class is None:
980 raise ObjectFormatException("missing object class")
981 headers.append((_TYPE_HEADER, self._object_class.type_name))
982 if self._name is None:
983 raise ObjectFormatException("missing tag name")
984 headers.append((_TAG_HEADER, self._name))
985 if self._tagger:
986 if self._tag_time is None:
987 headers.append((_TAGGER_HEADER, self._tagger))
988 else:
989 if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
990 raise ObjectFormatException("missing timezone info")
991 headers.append(
992 (
993 _TAGGER_HEADER,
994 format_time_entry(
995 self._tagger,
996 self._tag_time,
997 (self._tag_timezone, self._tag_timezone_neg_utc),
998 ),
999 )
1000 )
1002 if self.message is None and self._signature is None:
1003 body = None
1004 else:
1005 body = (self.message or b"") + (self._signature or b"")
1006 return list(_format_message(headers, body))
1008 def _deserialize(self, chunks: list[bytes]) -> None:
1009 """Grab the metadata attached to the tag."""
1010 self._tagger = None
1011 self._tag_time = None
1012 self._tag_timezone = None
1013 self._tag_timezone_neg_utc = False
1014 for field, value in _parse_message(chunks):
1015 if field == _OBJECT_HEADER:
1016 self._object_sha = value
1017 elif field == _TYPE_HEADER:
1018 assert isinstance(value, bytes)
1019 obj_class = object_class(value)
1020 if not obj_class:
1021 raise ObjectFormatException(f"Not a known type: {value!r}")
1022 self._object_class = obj_class
1023 elif field == _TAG_HEADER:
1024 self._name = value
1025 elif field == _TAGGER_HEADER:
1026 if value is None:
1027 raise ObjectFormatException("missing tagger value")
1028 (
1029 self._tagger,
1030 self._tag_time,
1031 (self._tag_timezone, self._tag_timezone_neg_utc),
1032 ) = parse_time_entry(value)
1033 elif field is None:
1034 if value is None:
1035 self._message = None
1036 self._signature = None
1037 else:
1038 # Try to find either PGP or SSH signature
1039 sig_idx = None
1040 try:
1041 sig_idx = value.index(BEGIN_PGP_SIGNATURE)
1042 except ValueError:
1043 try:
1044 sig_idx = value.index(BEGIN_SSH_SIGNATURE)
1045 except ValueError:
1046 pass
1048 if sig_idx is not None:
1049 self._message = value[:sig_idx]
1050 self._signature = value[sig_idx:]
1051 else:
1052 self._message = value
1053 self._signature = None
1054 else:
1055 raise ObjectFormatException(
1056 f"Unknown field {field.decode('ascii', 'replace')}"
1057 )
1059 def _get_object(self) -> tuple[type[ShaFile], bytes]:
1060 """Get the object pointed to by this tag.
1062 Returns: tuple of (object class, sha).
1063 """
1064 if self._object_class is None or self._object_sha is None:
1065 raise ValueError("Tag object is not properly initialized")
1066 return (self._object_class, self._object_sha)
1068 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
1069 (self._object_class, self._object_sha) = value
1070 self._needs_serialization = True
1072 object = property(_get_object, _set_object)
1074 name = serializable_property("name", "The name of this tag")
1075 tagger = serializable_property(
1076 "tagger", "Returns the name of the person who created this tag"
1077 )
1078 tag_time = serializable_property(
1079 "tag_time",
1080 "The creation timestamp of the tag. As the number of seconds since the epoch",
1081 )
1082 tag_timezone = serializable_property(
1083 "tag_timezone", "The timezone that tag_time is in."
1084 )
1085 message = serializable_property("message", "the message attached to this tag")
1087 signature = serializable_property("signature", "Optional detached GPG signature")
1089 def sign(self, keyid: Optional[str] = None) -> None:
1090 """Sign this tag with a GPG key.
1092 Args:
1093 keyid: Optional GPG key ID to use for signing. If not specified,
1094 the default GPG key will be used.
1095 """
1096 import gpg
1098 with gpg.Context(armor=True) as c:
1099 if keyid is not None:
1100 key = c.get_key(keyid)
1101 with gpg.Context(armor=True, signers=[key]) as ctx:
1102 self.signature, _unused_result = ctx.sign(
1103 self.as_raw_string(),
1104 mode=gpg.constants.sig.mode.DETACH,
1105 )
1106 else:
1107 self.signature, _unused_result = c.sign(
1108 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1109 )
1111 def raw_without_sig(self) -> bytes:
1112 """Return raw string serialization without the GPG/SSH signature.
1114 self.signature is a signature for the returned raw byte string serialization.
1115 """
1116 ret = self.as_raw_string()
1117 if self._signature:
1118 ret = ret[: -len(self._signature)]
1119 return ret
1121 def extract_signature(self) -> tuple[bytes, Optional[bytes], Optional[bytes]]:
1122 """Extract the payload, signature, and signature type from this tag.
1124 Returns:
1125 Tuple of (payload, signature, signature_type) where:
1126 - payload: The raw tag data without the signature
1127 - signature: The signature bytes if present, None otherwise
1128 - signature_type: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
1130 Raises:
1131 ObjectFormatException: If signature has unknown format
1132 """
1133 if self._signature is None:
1134 return self.as_raw_string(), None, None
1136 payload = self.raw_without_sig()
1138 # Determine signature type
1139 if self._signature.startswith(BEGIN_PGP_SIGNATURE):
1140 sig_type = SIGNATURE_PGP
1141 elif self._signature.startswith(BEGIN_SSH_SIGNATURE):
1142 sig_type = SIGNATURE_SSH
1143 else:
1144 raise ObjectFormatException("Unknown signature format")
1146 return payload, self._signature, sig_type
1148 def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
1149 """Verify GPG signature for this tag (if it is signed).
1151 Args:
1152 keyids: Optional iterable of trusted keyids for this tag.
1153 If this tag is not signed by any key in keyids verification will
1154 fail. If not specified, this function only verifies that the tag
1155 has a valid signature.
1157 Raises:
1158 gpg.errors.BadSignatures: if GPG signature verification fails
1159 gpg.errors.MissingSignatures: if tag was not signed by a key
1160 specified in keyids
1161 """
1162 if self._signature is None:
1163 return
1165 import gpg
1167 with gpg.Context() as ctx:
1168 data, result = ctx.verify(
1169 self.raw_without_sig(),
1170 signature=self._signature,
1171 )
1172 if keyids:
1173 keys = [ctx.get_key(key) for key in keyids]
1174 for key in keys:
1175 for subkey in key.subkeys:
1176 for sig in result.signatures:
1177 if subkey.can_sign and subkey.fpr == sig.fpr:
1178 return
1179 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1182class TreeEntry(NamedTuple):
1183 """Named tuple encapsulating a single tree entry."""
1185 path: bytes
1186 mode: int
1187 sha: bytes
1189 def in_path(self, path: bytes) -> "TreeEntry":
1190 """Return a copy of this entry with the given path prepended."""
1191 if not isinstance(self.path, bytes):
1192 raise TypeError(f"Expected bytes for path, got {path!r}")
1193 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
1196def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]:
1197 """Parse a tree text.
1199 Args:
1200 text: Serialized text to parse
1201 strict: If True, enforce strict validation
1202 Returns: iterator of tuples of (name, mode, sha)
1204 Raises:
1205 ObjectFormatException: if the object was malformed in some way
1206 """
1207 count = 0
1208 length = len(text)
1209 while count < length:
1210 mode_end = text.index(b" ", count)
1211 mode_text = text[count:mode_end]
1212 if strict and mode_text.startswith(b"0"):
1213 raise ObjectFormatException(f"Invalid mode {mode_text!r}")
1214 try:
1215 mode = int(mode_text, 8)
1216 except ValueError as exc:
1217 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
1218 name_end = text.index(b"\0", mode_end)
1219 name = text[mode_end + 1 : name_end]
1220 count = name_end + 21
1221 sha = text[name_end + 1 : count]
1222 if len(sha) != 20:
1223 raise ObjectFormatException("Sha has invalid length")
1224 hexsha = sha_to_hex(sha)
1225 yield (name, mode, hexsha)
1228def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]:
1229 """Serialize the items in a tree to a text.
1231 Args:
1232 items: Sorted iterable over (name, mode, sha) tuples
1233 Returns: Serialized tree text as chunks
1234 """
1235 for name, mode, hexsha in items:
1236 yield (
1237 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha)
1238 )
1241def sorted_tree_items(
1242 entries: dict[bytes, tuple[int, bytes]], name_order: bool
1243) -> Iterator[TreeEntry]:
1244 """Iterate over a tree entries dictionary.
1246 Args:
1247 name_order: If True, iterate entries in order of their name. If
1248 False, iterate entries in tree order, that is, treat subtree entries as
1249 having '/' appended.
1250 entries: Dictionary mapping names to (mode, sha) tuples
1251 Returns: Iterator over (name, mode, hexsha)
1252 """
1253 if name_order:
1254 key_func = key_entry_name_order
1255 else:
1256 key_func = key_entry
1257 for name, entry in sorted(entries.items(), key=key_func):
1258 mode, hexsha = entry
1259 # Stricter type checks than normal to mirror checks in the Rust version.
1260 mode = int(mode)
1261 if not isinstance(hexsha, bytes):
1262 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}")
1263 yield TreeEntry(name, mode, hexsha)
1266def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1267 """Sort key for tree entry.
1269 Args:
1270 entry: (name, value) tuple
1271 """
1272 (name, (mode, _sha)) = entry
1273 if stat.S_ISDIR(mode):
1274 name += b"/"
1275 return name
1278def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1279 """Sort key for tree entry in name order."""
1280 return entry[0]
1283def pretty_format_tree_entry(
1284 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8"
1285) -> str:
1286 """Pretty format tree entry.
1288 Args:
1289 name: Name of the directory entry
1290 mode: Mode of entry
1291 hexsha: Hexsha of the referenced object
1292 encoding: Character encoding for the name
1293 Returns: string describing the tree entry
1294 """
1295 if mode & stat.S_IFDIR:
1296 kind = "tree"
1297 else:
1298 kind = "blob"
1299 return "{:04o} {} {}\t{}\n".format(
1300 mode,
1301 kind,
1302 hexsha.decode("ascii"),
1303 name.decode(encoding, "replace"),
1304 )
1307class SubmoduleEncountered(Exception):
1308 """A submodule was encountered while resolving a path."""
1310 def __init__(self, path: bytes, sha: ObjectID) -> None:
1311 """Initialize SubmoduleEncountered exception.
1313 Args:
1314 path: Path where the submodule was encountered
1315 sha: SHA of the submodule
1316 """
1317 self.path = path
1318 self.sha = sha
1321class Tree(ShaFile):
1322 """A Git tree object."""
1324 type_name = b"tree"
1325 type_num = 2
1327 __slots__ = "_entries"
1329 def __init__(self) -> None:
1330 """Initialize an empty Tree."""
1331 super().__init__()
1332 self._entries: dict[bytes, tuple[int, bytes]] = {}
1334 @classmethod
1335 def from_path(cls, filename: Union[str, bytes]) -> "Tree":
1336 """Read a tree from a file on disk.
1338 Args:
1339 filename: Path to the tree file
1341 Returns:
1342 A Tree object
1344 Raises:
1345 NotTreeError: If the file is not a tree
1346 """
1347 tree = ShaFile.from_path(filename)
1348 if not isinstance(tree, cls):
1349 raise NotTreeError(_path_to_bytes(filename))
1350 return tree
1352 def __contains__(self, name: bytes) -> bool:
1353 """Check if name exists in tree."""
1354 return name in self._entries
1356 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
1357 """Get tree entry by name."""
1358 return self._entries[name]
1360 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
1361 """Set a tree entry by name.
1363 Args:
1364 name: The name of the entry, as a string.
1365 value: A tuple of (mode, hexsha), where mode is the mode of the
1366 entry as an integral type and hexsha is the hex SHA of the entry as
1367 a string.
1368 """
1369 mode, hexsha = value
1370 self._entries[name] = (mode, hexsha)
1371 self._needs_serialization = True
1373 def __delitem__(self, name: bytes) -> None:
1374 """Delete tree entry by name."""
1375 del self._entries[name]
1376 self._needs_serialization = True
1378 def __len__(self) -> int:
1379 """Return number of entries in tree."""
1380 return len(self._entries)
1382 def __iter__(self) -> Iterator[bytes]:
1383 """Iterate over tree entry names."""
1384 return iter(self._entries)
1386 def add(self, name: bytes, mode: int, hexsha: bytes) -> None:
1387 """Add an entry to the tree.
1389 Args:
1390 mode: The mode of the entry as an integral type. Not all
1391 possible modes are supported by git; see check() for details.
1392 name: The name of the entry, as a string.
1393 hexsha: The hex SHA of the entry as a string.
1394 """
1395 self._entries[name] = mode, hexsha
1396 self._needs_serialization = True
1398 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
1399 """Iterate over entries.
1401 Args:
1402 name_order: If True, iterate in name order instead of tree
1403 order.
1404 Returns: Iterator over (name, mode, sha) tuples
1405 """
1406 return sorted_tree_items(self._entries, name_order)
1408 def items(self) -> list[TreeEntry]:
1409 """Return the sorted entries in this tree.
1411 Returns: List with (name, mode, sha) tuples
1412 """
1413 return list(self.iteritems())
1415 def _deserialize(self, chunks: list[bytes]) -> None:
1416 """Grab the entries in the tree."""
1417 try:
1418 parsed_entries = parse_tree(b"".join(chunks))
1419 except ValueError as exc:
1420 raise ObjectFormatException(exc) from exc
1421 # TODO: list comprehension is for efficiency in the common (small)
1422 # case; if memory efficiency in the large case is a concern, use a
1423 # genexp.
1424 self._entries = {n: (m, s) for n, m, s in parsed_entries}
1426 def check(self) -> None:
1427 """Check this object for internal consistency.
1429 Raises:
1430 ObjectFormatException: if the object is malformed in some way
1431 """
1432 super().check()
1433 assert self._chunked_text is not None
1434 last = None
1435 allowed_modes = (
1436 stat.S_IFREG | 0o755,
1437 stat.S_IFREG | 0o644,
1438 stat.S_IFLNK,
1439 stat.S_IFDIR,
1440 S_IFGITLINK,
1441 # TODO: optionally exclude as in git fsck --strict
1442 stat.S_IFREG | 0o664,
1443 )
1444 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True):
1445 check_hexsha(sha, f"invalid sha {sha!r}")
1446 if b"/" in name or name in (b"", b".", b"..", b".git"):
1447 raise ObjectFormatException(
1448 "invalid name {}".format(name.decode("utf-8", "replace"))
1449 )
1451 if mode not in allowed_modes:
1452 raise ObjectFormatException(f"invalid mode {mode:06o}")
1454 entry = (name, (mode, sha))
1455 if last:
1456 if key_entry(last) > key_entry(entry):
1457 raise ObjectFormatException("entries not sorted")
1458 if name == last[0]:
1459 raise ObjectFormatException(f"duplicate entry {name!r}")
1460 last = entry
1462 def _serialize(self) -> list[bytes]:
1463 return list(serialize_tree(self.iteritems()))
1465 def as_pretty_string(self) -> str:
1466 """Return a human-readable string representation of this tree.
1468 Returns:
1469 Pretty-printed tree entries
1470 """
1471 text: list[str] = []
1472 for entry in self.iteritems():
1473 if (
1474 entry.path is not None
1475 and entry.mode is not None
1476 and entry.sha is not None
1477 ):
1478 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha))
1479 return "".join(text)
1481 def lookup_path(
1482 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
1483 ) -> tuple[int, ObjectID]:
1484 """Look up an object in a Git tree.
1486 Args:
1487 lookup_obj: Callback for retrieving object by SHA1
1488 path: Path to lookup
1489 Returns: A tuple of (mode, SHA) of the resulting path.
1490 """
1491 # Handle empty path - return the tree itself
1492 if not path:
1493 return stat.S_IFDIR, self.id
1495 parts = path.split(b"/")
1496 sha = self.id
1497 mode: Optional[int] = None
1498 for i, p in enumerate(parts):
1499 if not p:
1500 continue
1501 if mode is not None and S_ISGITLINK(mode):
1502 raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
1503 obj = lookup_obj(sha)
1504 if not isinstance(obj, Tree):
1505 raise NotTreeError(sha)
1506 mode, sha = obj[p]
1507 if mode is None:
1508 raise ValueError("No valid path found")
1509 return mode, sha
1512def parse_timezone(text: bytes) -> tuple[int, bool]:
1513 """Parse a timezone text fragment (e.g. '+0100').
1515 Args:
1516 text: Text to parse.
1517 Returns: Tuple with timezone as seconds difference to UTC
1518 and a boolean indicating whether this was a UTC timezone
1519 prefixed with a negative sign (-0000).
1520 """
1521 # cgit parses the first character as the sign, and the rest
1522 # as an integer (using strtol), which could also be negative.
1523 # We do the same for compatibility. See #697828.
1524 if text[0] not in b"+-":
1525 raise ValueError("Timezone must start with + or - ({text})".format(**vars()))
1526 sign = text[:1]
1527 offset = int(text[1:])
1528 if sign == b"-":
1529 offset = -offset
1530 unnecessary_negative_timezone = offset >= 0 and sign == b"-"
1531 signum = ((offset < 0) and -1) or 1
1532 offset = abs(offset)
1533 hours = int(offset / 100)
1534 minutes = offset % 100
1535 return (
1536 signum * (hours * 3600 + minutes * 60),
1537 unnecessary_negative_timezone,
1538 )
1541def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
1542 """Format a timezone for Git serialization.
1544 Args:
1545 offset: Timezone offset as seconds difference to UTC
1546 unnecessary_negative_timezone: Whether to use a minus sign for
1547 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700).
1548 """
1549 if offset % 60 != 0:
1550 raise ValueError("Unable to handle non-minute offset.")
1551 if offset < 0 or unnecessary_negative_timezone:
1552 sign = "-"
1553 offset = -offset
1554 else:
1555 sign = "+"
1556 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
1559def parse_time_entry(
1560 value: bytes,
1561) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]:
1562 """Parse event.
1564 Args:
1565 value: Bytes representing a git commit/tag line
1566 Raises:
1567 ObjectFormatException in case of parsing error (malformed
1568 field date)
1569 Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
1570 """
1571 try:
1572 sep = value.rindex(b"> ")
1573 except ValueError:
1574 return (value, None, (None, False))
1575 try:
1576 person = value[0 : sep + 1]
1577 rest = value[sep + 2 :]
1578 timetext, timezonetext = rest.rsplit(b" ", 1)
1579 time = int(timetext)
1580 timezone, timezone_neg_utc = parse_timezone(timezonetext)
1581 except ValueError as exc:
1582 raise ObjectFormatException(exc) from exc
1583 return person, time, (timezone, timezone_neg_utc)
1586def format_time_entry(
1587 person: bytes, time: int, timezone_info: tuple[int, bool]
1588) -> bytes:
1589 """Format an event."""
1590 (timezone, timezone_neg_utc) = timezone_info
1591 return b" ".join(
1592 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
1593 )
1596@replace_me(since="0.21.0", remove_in="0.24.0")
1597def parse_commit(
1598 chunks: Iterable[bytes],
1599) -> tuple[
1600 Optional[bytes],
1601 list[bytes],
1602 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
1603 tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
1604 Optional[bytes],
1605 list[Tag],
1606 Optional[bytes],
1607 Optional[bytes],
1608 list[tuple[bytes, bytes]],
1609]:
1610 """Parse a commit object from chunks.
1612 Args:
1613 chunks: Chunks to parse
1614 Returns: Tuple of (tree, parents, author_info, commit_info,
1615 encoding, mergetag, gpgsig, message, extra)
1616 """
1617 parents = []
1618 extra = []
1619 tree = None
1620 author_info: tuple[
1621 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1622 ] = (None, None, (None, None))
1623 commit_info: tuple[
1624 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1625 ] = (None, None, (None, None))
1626 encoding = None
1627 mergetag = []
1628 message = None
1629 gpgsig = None
1631 for field, value in _parse_message(chunks):
1632 # TODO(jelmer): Enforce ordering
1633 if field == _TREE_HEADER:
1634 tree = value
1635 elif field == _PARENT_HEADER:
1636 if value is None:
1637 raise ObjectFormatException("missing parent value")
1638 parents.append(value)
1639 elif field == _AUTHOR_HEADER:
1640 if value is None:
1641 raise ObjectFormatException("missing author value")
1642 author_info = parse_time_entry(value)
1643 elif field == _COMMITTER_HEADER:
1644 if value is None:
1645 raise ObjectFormatException("missing committer value")
1646 commit_info = parse_time_entry(value)
1647 elif field == _ENCODING_HEADER:
1648 encoding = value
1649 elif field == _MERGETAG_HEADER:
1650 if value is None:
1651 raise ObjectFormatException("missing mergetag value")
1652 tag = Tag.from_string(value + b"\n")
1653 assert isinstance(tag, Tag)
1654 mergetag.append(tag)
1655 elif field == _GPGSIG_HEADER:
1656 gpgsig = value
1657 elif field is None:
1658 message = value
1659 else:
1660 if value is None:
1661 raise ObjectFormatException(f"missing value for field {field!r}")
1662 extra.append((field, value))
1663 return (
1664 tree,
1665 parents,
1666 author_info,
1667 commit_info,
1668 encoding,
1669 mergetag,
1670 gpgsig,
1671 message,
1672 extra,
1673 )
1676class Commit(ShaFile):
1677 """A git commit object."""
1679 type_name = b"commit"
1680 type_num = 1
1682 __slots__ = (
1683 "_author",
1684 "_author_time",
1685 "_author_timezone",
1686 "_author_timezone_neg_utc",
1687 "_commit_time",
1688 "_commit_timezone",
1689 "_commit_timezone_neg_utc",
1690 "_committer",
1691 "_encoding",
1692 "_extra",
1693 "_gpgsig",
1694 "_mergetag",
1695 "_message",
1696 "_parents",
1697 "_tree",
1698 )
1700 def __init__(self) -> None:
1701 """Initialize an empty Commit."""
1702 super().__init__()
1703 self._parents: list[bytes] = []
1704 self._encoding: Optional[bytes] = None
1705 self._mergetag: list[Tag] = []
1706 self._gpgsig: Optional[bytes] = None
1707 self._extra: list[tuple[bytes, Optional[bytes]]] = []
1708 self._author_timezone_neg_utc: Optional[bool] = False
1709 self._commit_timezone_neg_utc: Optional[bool] = False
1711 @classmethod
1712 def from_path(cls, path: Union[str, bytes]) -> "Commit":
1713 """Read a commit from a file on disk.
1715 Args:
1716 path: Path to the commit file
1718 Returns:
1719 A Commit object
1721 Raises:
1722 NotCommitError: If the file is not a commit
1723 """
1724 commit = ShaFile.from_path(path)
1725 if not isinstance(commit, cls):
1726 raise NotCommitError(_path_to_bytes(path))
1727 return commit
1729 def _deserialize(self, chunks: list[bytes]) -> None:
1730 self._parents = []
1731 self._extra = []
1732 self._tree = None
1733 author_info: tuple[
1734 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1735 ] = (None, None, (None, None))
1736 commit_info: tuple[
1737 Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
1738 ] = (None, None, (None, None))
1739 self._encoding = None
1740 self._mergetag = []
1741 self._message = None
1742 self._gpgsig = None
1744 for field, value in _parse_message(chunks):
1745 # TODO(jelmer): Enforce ordering
1746 if field == _TREE_HEADER:
1747 self._tree = value
1748 elif field == _PARENT_HEADER:
1749 assert value is not None
1750 self._parents.append(value)
1751 elif field == _AUTHOR_HEADER:
1752 if value is None:
1753 raise ObjectFormatException("missing author value")
1754 author_info = parse_time_entry(value)
1755 elif field == _COMMITTER_HEADER:
1756 if value is None:
1757 raise ObjectFormatException("missing committer value")
1758 commit_info = parse_time_entry(value)
1759 elif field == _ENCODING_HEADER:
1760 self._encoding = value
1761 elif field == _MERGETAG_HEADER:
1762 assert value is not None
1763 tag = Tag.from_string(value + b"\n")
1764 assert isinstance(tag, Tag)
1765 self._mergetag.append(tag)
1766 elif field == _GPGSIG_HEADER:
1767 self._gpgsig = value
1768 elif field is None:
1769 self._message = value
1770 else:
1771 self._extra.append((field, value))
1773 (
1774 self._author,
1775 self._author_time,
1776 (self._author_timezone, self._author_timezone_neg_utc),
1777 ) = author_info
1778 (
1779 self._committer,
1780 self._commit_time,
1781 (self._commit_timezone, self._commit_timezone_neg_utc),
1782 ) = commit_info
1784 def check(self) -> None:
1785 """Check this object for internal consistency.
1787 Raises:
1788 ObjectFormatException: if the object is malformed in some way
1789 """
1790 super().check()
1791 assert self._chunked_text is not None
1792 self._check_has_member("_tree", "missing tree")
1793 self._check_has_member("_author", "missing author")
1794 self._check_has_member("_committer", "missing committer")
1795 self._check_has_member("_author_time", "missing author time")
1796 self._check_has_member("_commit_time", "missing commit time")
1798 for parent in self._parents:
1799 check_hexsha(parent, "invalid parent sha")
1800 assert self._tree is not None # checked by _check_has_member above
1801 check_hexsha(self._tree, "invalid tree sha")
1803 assert self._author is not None # checked by _check_has_member above
1804 assert self._committer is not None # checked by _check_has_member above
1805 check_identity(self._author, "invalid author")
1806 check_identity(self._committer, "invalid committer")
1808 assert self._author_time is not None # checked by _check_has_member above
1809 assert self._commit_time is not None # checked by _check_has_member above
1810 check_time(self._author_time)
1811 check_time(self._commit_time)
1813 last = None
1814 for field, _ in _parse_message(self._chunked_text):
1815 if field == _TREE_HEADER and last is not None:
1816 raise ObjectFormatException("unexpected tree")
1817 elif field == _PARENT_HEADER and last not in (
1818 _PARENT_HEADER,
1819 _TREE_HEADER,
1820 ):
1821 raise ObjectFormatException("unexpected parent")
1822 elif field == _AUTHOR_HEADER and last not in (
1823 _TREE_HEADER,
1824 _PARENT_HEADER,
1825 ):
1826 raise ObjectFormatException("unexpected author")
1827 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER:
1828 raise ObjectFormatException("unexpected committer")
1829 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER:
1830 raise ObjectFormatException("unexpected encoding")
1831 last = field
1833 # TODO: optionally check for duplicate parents
1835 def sign(self, keyid: Optional[str] = None) -> None:
1836 """Sign this commit with a GPG key.
1838 Args:
1839 keyid: Optional GPG key ID to use for signing. If not specified,
1840 the default GPG key will be used.
1841 """
1842 import gpg
1844 with gpg.Context(armor=True) as c:
1845 if keyid is not None:
1846 key = c.get_key(keyid)
1847 with gpg.Context(armor=True, signers=[key]) as ctx:
1848 self.gpgsig, _unused_result = ctx.sign(
1849 self.as_raw_string(),
1850 mode=gpg.constants.sig.mode.DETACH,
1851 )
1852 else:
1853 self.gpgsig, _unused_result = c.sign(
1854 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1855 )
1857 def raw_without_sig(self) -> bytes:
1858 """Return raw string serialization without the GPG/SSH signature.
1860 self.gpgsig is a signature for the returned raw byte string serialization.
1861 """
1862 tmp = self.copy()
1863 assert isinstance(tmp, Commit)
1864 tmp._gpgsig = None
1865 tmp.gpgsig = None
1866 return tmp.as_raw_string()
1868 def extract_signature(self) -> tuple[bytes, Optional[bytes], Optional[bytes]]:
1869 """Extract the payload, signature, and signature type from this commit.
1871 Returns:
1872 Tuple of (payload, signature, signature_type) where:
1873 - payload: The raw commit data without the signature
1874 - signature: The signature bytes if present, None otherwise
1875 - signature_type: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
1877 Raises:
1878 ObjectFormatException: If signature has unknown format
1879 """
1880 if self._gpgsig is None:
1881 return self.as_raw_string(), None, None
1883 payload = self.raw_without_sig()
1885 # Determine signature type
1886 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE):
1887 sig_type = SIGNATURE_PGP
1888 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE):
1889 sig_type = SIGNATURE_SSH
1890 else:
1891 raise ObjectFormatException("Unknown signature format")
1893 return payload, self._gpgsig, sig_type
1895 def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
1896 """Verify GPG signature for this commit (if it is signed).
1898 Args:
1899 keyids: Optional iterable of trusted keyids for this commit.
1900 If this commit is not signed by any key in keyids verification will
1901 fail. If not specified, this function only verifies that the commit
1902 has a valid signature.
1904 Raises:
1905 gpg.errors.BadSignatures: if GPG signature verification fails
1906 gpg.errors.MissingSignatures: if commit was not signed by a key
1907 specified in keyids
1908 """
1909 if self._gpgsig is None:
1910 return
1912 import gpg
1914 with gpg.Context() as ctx:
1915 data, result = ctx.verify(
1916 self.raw_without_sig(),
1917 signature=self._gpgsig,
1918 )
1919 if keyids:
1920 keys = [ctx.get_key(key) for key in keyids]
1921 for key in keys:
1922 for subkey in key.subkeys:
1923 for sig in result.signatures:
1924 if subkey.can_sign and subkey.fpr == sig.fpr:
1925 return
1926 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1928 def _serialize(self) -> list[bytes]:
1929 headers = []
1930 assert self._tree is not None
1931 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
1932 headers.append((_TREE_HEADER, tree_bytes))
1933 for p in self._parents:
1934 headers.append((_PARENT_HEADER, p))
1935 assert self._author is not None
1936 assert self._author_time is not None
1937 assert self._author_timezone is not None
1938 assert self._author_timezone_neg_utc is not None
1939 headers.append(
1940 (
1941 _AUTHOR_HEADER,
1942 format_time_entry(
1943 self._author,
1944 self._author_time,
1945 (self._author_timezone, self._author_timezone_neg_utc),
1946 ),
1947 )
1948 )
1949 assert self._committer is not None
1950 assert self._commit_time is not None
1951 assert self._commit_timezone is not None
1952 assert self._commit_timezone_neg_utc is not None
1953 headers.append(
1954 (
1955 _COMMITTER_HEADER,
1956 format_time_entry(
1957 self._committer,
1958 self._commit_time,
1959 (self._commit_timezone, self._commit_timezone_neg_utc),
1960 ),
1961 )
1962 )
1963 if self.encoding:
1964 headers.append((_ENCODING_HEADER, self.encoding))
1965 for mergetag in self.mergetag:
1966 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
1967 headers.extend(
1968 (field, value) for field, value in self._extra if value is not None
1969 )
1970 if self.gpgsig:
1971 headers.append((_GPGSIG_HEADER, self.gpgsig))
1972 return list(_format_message(headers, self._message))
1974 tree = serializable_property("tree", "Tree that is the state of this commit")
1976 def _get_parents(self) -> list[bytes]:
1977 """Return a list of parents of this commit."""
1978 return self._parents
1980 def _set_parents(self, value: list[bytes]) -> None:
1981 """Set a list of parents of this commit."""
1982 self._needs_serialization = True
1983 self._parents = value
1985 parents = property(
1986 _get_parents,
1987 _set_parents,
1988 doc="Parents of this commit, by their SHA1.",
1989 )
1991 @replace_me(since="0.21.0", remove_in="0.24.0")
1992 def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]:
1993 """Return extra settings of this commit."""
1994 return self._extra
1996 extra = property(
1997 _get_extra,
1998 doc="Extra header fields not understood (presumably added in a "
1999 "newer version of git). Kept verbatim so the object can "
2000 "be correctly reserialized. For private commit metadata, use "
2001 "pseudo-headers in Commit.message, rather than this field.",
2002 )
2004 author = serializable_property("author", "The name of the author of the commit")
2006 committer = serializable_property(
2007 "committer", "The name of the committer of the commit"
2008 )
2010 message = serializable_property("message", "The commit message")
2012 commit_time = serializable_property(
2013 "commit_time",
2014 "The timestamp of the commit. As the number of seconds since the epoch.",
2015 )
2017 commit_timezone = serializable_property(
2018 "commit_timezone", "The zone the commit time is in"
2019 )
2021 author_time = serializable_property(
2022 "author_time",
2023 "The timestamp the commit was written. As the number of "
2024 "seconds since the epoch.",
2025 )
2027 author_timezone = serializable_property(
2028 "author_timezone", "Returns the zone the author time is in."
2029 )
2031 encoding = serializable_property("encoding", "Encoding of the commit message.")
2033 mergetag = serializable_property("mergetag", "Associated signed tag.")
2035 gpgsig = serializable_property("gpgsig", "GPG Signature.")
2038OBJECT_CLASSES = (
2039 Commit,
2040 Tree,
2041 Blob,
2042 Tag,
2043)
2045_TYPE_MAP: dict[Union[bytes, int], type[ShaFile]] = {}
2047for cls in OBJECT_CLASSES:
2048 _TYPE_MAP[cls.type_name] = cls
2049 _TYPE_MAP[cls.type_num] = cls
2052# Hold on to the pure-python implementations for testing
2053_parse_tree_py = parse_tree
2054_sorted_tree_items_py = sorted_tree_items
2055try:
2056 # Try to import Rust versions
2057 from dulwich._objects import (
2058 parse_tree as _parse_tree_rs,
2059 )
2060 from dulwich._objects import (
2061 sorted_tree_items as _sorted_tree_items_rs,
2062 )
2063except ImportError:
2064 pass
2065else:
2066 parse_tree = _parse_tree_rs
2067 sorted_tree_items = _sorted_tree_items_rs