Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# objects.py -- Access to base git objects
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Access to base git objects."""
25import binascii
26import os
27import posixpath
28import stat
29import sys
30import zlib
31from collections.abc import Callable, Iterable, Iterator, Sequence
32from hashlib import sha1
33from io import BufferedIOBase, BytesIO
34from typing import (
35 IO,
36 TYPE_CHECKING,
37 NamedTuple,
38 TypeVar,
39 Union,
40)
42if sys.version_info >= (3, 11):
43 from typing import Self
44else:
45 from typing_extensions import Self
47from typing import TypeGuard
49from . import replace_me
50from .errors import (
51 ChecksumMismatch,
52 FileFormatException,
53 NotBlobError,
54 NotCommitError,
55 NotTagError,
56 NotTreeError,
57 ObjectFormatException,
58)
59from .file import GitFile
61if TYPE_CHECKING:
62 from _hashlib import HASH
64 from .file import _GitFile
66ZERO_SHA = b"0" * 40
68# Header fields for commits
69_TREE_HEADER = b"tree"
70_PARENT_HEADER = b"parent"
71_AUTHOR_HEADER = b"author"
72_COMMITTER_HEADER = b"committer"
73_ENCODING_HEADER = b"encoding"
74_MERGETAG_HEADER = b"mergetag"
75_GPGSIG_HEADER = b"gpgsig"
77# Header fields for objects
78_OBJECT_HEADER = b"object"
79_TYPE_HEADER = b"type"
80_TAG_HEADER = b"tag"
81_TAGGER_HEADER = b"tagger"
84S_IFGITLINK = 0o160000
87MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
89BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
90BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----"
92# Signature type constants
93SIGNATURE_PGP = b"pgp"
94SIGNATURE_SSH = b"ssh"
97ObjectID = bytes
100class EmptyFileException(FileFormatException):
101 """An unexpectedly empty file was encountered."""
104def S_ISGITLINK(m: int) -> bool:
105 """Check if a mode indicates a submodule.
107 Args:
108 m: Mode to check
109 Returns: a ``boolean``
110 """
111 return stat.S_IFMT(m) == S_IFGITLINK
114def _decompress(string: bytes) -> bytes:
115 dcomp = zlib.decompressobj()
116 dcomped = dcomp.decompress(string)
117 dcomped += dcomp.flush()
118 return dcomped
121def sha_to_hex(sha: ObjectID) -> bytes:
122 """Takes a string and returns the hex of the sha within."""
123 hexsha = binascii.hexlify(sha)
124 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}"
125 return hexsha
128def hex_to_sha(hex: bytes | str) -> bytes:
129 """Takes a hex sha and returns a binary sha."""
130 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}"
131 try:
132 return binascii.unhexlify(hex)
133 except TypeError as exc:
134 if not isinstance(hex, bytes):
135 raise
136 raise ValueError(exc.args[0]) from exc
139def valid_hexsha(hex: bytes | str) -> bool:
140 """Check if a string is a valid hex SHA.
142 Args:
143 hex: Hex string to check
145 Returns:
146 True if valid hex SHA, False otherwise
147 """
148 if len(hex) != 40:
149 return False
150 try:
151 binascii.unhexlify(hex)
152 except (TypeError, binascii.Error):
153 return False
154 else:
155 return True
158PathT = TypeVar("PathT", str, bytes)
161def hex_to_filename(path: PathT, hex: str | bytes) -> PathT:
162 """Takes a hex sha and returns its filename relative to the given path."""
163 # os.path.join accepts bytes or unicode, but all args must be of the same
164 # type. Make sure that hex which is expected to be bytes, is the same type
165 # as path.
166 if isinstance(path, str):
167 if isinstance(hex, bytes):
168 hex_str = hex.decode("ascii")
169 else:
170 hex_str = hex
171 dir_name = hex_str[:2]
172 file_name = hex_str[2:]
173 result = os.path.join(path, dir_name, file_name)
174 assert isinstance(result, str)
175 return result
176 else:
177 # path is bytes
178 if isinstance(hex, str):
179 hex_bytes = hex.encode("ascii")
180 else:
181 hex_bytes = hex
182 dir_name_b = hex_bytes[:2]
183 file_name_b = hex_bytes[2:]
184 result_b = os.path.join(path, dir_name_b, file_name_b)
185 assert isinstance(result_b, bytes)
186 return result_b
189def filename_to_hex(filename: str | bytes) -> str:
190 """Takes an object filename and returns its corresponding hex sha."""
191 # grab the last (up to) two path components
192 errmsg = f"Invalid object filename: {filename!r}"
193 if isinstance(filename, str):
194 names = filename.rsplit(os.path.sep, 2)[-2:]
195 assert len(names) == 2, errmsg
196 base, rest = names
197 assert len(base) == 2 and len(rest) == 38, errmsg
198 hex_str = base + rest
199 hex_bytes = hex_str.encode("ascii")
200 else:
201 # filename is bytes
202 sep = (
203 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep
204 )
205 names_b = filename.rsplit(sep, 2)[-2:]
206 assert len(names_b) == 2, errmsg
207 base_b, rest_b = names_b
208 assert len(base_b) == 2 and len(rest_b) == 38, errmsg
209 hex_bytes = base_b + rest_b
210 hex_to_sha(hex_bytes)
211 return hex_bytes.decode("ascii")
214def object_header(num_type: int, length: int) -> bytes:
215 """Return an object header for the given numeric type and text length."""
216 cls = object_class(num_type)
217 if cls is None:
218 raise AssertionError(f"unsupported class type num: {num_type}")
219 return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
222def serializable_property(name: str, docstring: str | None = None) -> property:
223 """A property that helps tracking whether serialization is necessary."""
225 def set(obj: "ShaFile", value: object) -> None:
226 """Set the property value and mark the object as needing serialization.
228 Args:
229 obj: The ShaFile object
230 value: The value to set
231 """
232 setattr(obj, "_" + name, value)
233 obj._needs_serialization = True
235 def get(obj: "ShaFile") -> object:
236 """Get the property value.
238 Args:
239 obj: The ShaFile object
241 Returns:
242 The property value
243 """
244 return getattr(obj, "_" + name)
246 return property(get, set, doc=docstring)
249def object_class(type: bytes | int) -> type["ShaFile"] | None:
250 """Get the object class corresponding to the given type.
252 Args:
253 type: Either a type name string or a numeric type.
254 Returns: The ShaFile subclass corresponding to the given type, or None if
255 type is not a valid type name/number.
256 """
257 return _TYPE_MAP.get(type, None)
260def check_hexsha(hex: str | bytes, error_msg: str) -> None:
261 """Check if a string is a valid hex sha string.
263 Args:
264 hex: Hex string to check
265 error_msg: Error message to use in exception
266 Raises:
267 ObjectFormatException: Raised when the string is not valid
268 """
269 if not valid_hexsha(hex):
270 raise ObjectFormatException(f"{error_msg} {hex!r}")
273def check_identity(identity: bytes | None, error_msg: str) -> None:
274 """Check if the specified identity is valid.
276 This will raise an exception if the identity is not valid.
278 Args:
279 identity: Identity string
280 error_msg: Error message to use in exception
281 """
282 if identity is None:
283 raise ObjectFormatException(error_msg)
284 email_start = identity.find(b"<")
285 email_end = identity.find(b">")
286 if not all(
287 [
288 email_start >= 1,
289 identity[email_start - 1] == b" "[0],
290 identity.find(b"<", email_start + 1) == -1,
291 email_end == len(identity) - 1,
292 b"\0" not in identity,
293 b"\n" not in identity,
294 ]
295 ):
296 raise ObjectFormatException(error_msg)
299def _path_to_bytes(path: str | bytes) -> bytes:
300 """Convert a path to bytes for use in error messages."""
301 if isinstance(path, str):
302 return path.encode("utf-8", "surrogateescape")
303 return path
306def check_time(time_seconds: int) -> None:
307 """Check if the specified time is not prone to overflow error.
309 This will raise an exception if the time is not valid.
311 Args:
312 time_seconds: time in seconds
314 """
315 # Prevent overflow error
316 if time_seconds > MAX_TIME:
317 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
320def git_line(*items: bytes) -> bytes:
321 """Formats items into a space separated line."""
322 return b" ".join(items) + b"\n"
325class FixedSha:
326 """SHA object that behaves like hashlib's but is given a fixed value."""
328 __slots__ = ("_hexsha", "_sha")
330 def __init__(self, hexsha: str | bytes) -> None:
331 """Initialize FixedSha with a fixed SHA value.
333 Args:
334 hexsha: Hex SHA value as string or bytes
335 """
336 if isinstance(hexsha, str):
337 hexsha = hexsha.encode("ascii")
338 if not isinstance(hexsha, bytes):
339 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
340 self._hexsha = hexsha
341 self._sha = hex_to_sha(hexsha)
343 def digest(self) -> bytes:
344 """Return the raw SHA digest."""
345 return self._sha
347 def hexdigest(self) -> str:
348 """Return the hex SHA digest."""
349 return self._hexsha.decode("ascii")
352# Type guard functions for runtime type narrowing
353if TYPE_CHECKING:
355 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
356 """Check if a ShaFile is a Commit."""
357 return obj.type_name == b"commit"
359 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
360 """Check if a ShaFile is a Tree."""
361 return obj.type_name == b"tree"
363 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
364 """Check if a ShaFile is a Blob."""
365 return obj.type_name == b"blob"
367 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
368 """Check if a ShaFile is a Tag."""
369 return obj.type_name == b"tag"
370else:
371 # Runtime versions without type narrowing
372 def is_commit(obj: "ShaFile") -> bool:
373 """Check if a ShaFile is a Commit."""
374 return obj.type_name == b"commit"
376 def is_tree(obj: "ShaFile") -> bool:
377 """Check if a ShaFile is a Tree."""
378 return obj.type_name == b"tree"
380 def is_blob(obj: "ShaFile") -> bool:
381 """Check if a ShaFile is a Blob."""
382 return obj.type_name == b"blob"
384 def is_tag(obj: "ShaFile") -> bool:
385 """Check if a ShaFile is a Tag."""
386 return obj.type_name == b"tag"
389class ShaFile:
390 """A git SHA file."""
392 __slots__ = ("_chunked_text", "_needs_serialization", "_sha")
394 _needs_serialization: bool
395 type_name: bytes
396 type_num: int
397 _chunked_text: list[bytes] | None
398 _sha: Union[FixedSha, None, "HASH"]
400 @staticmethod
401 def _parse_legacy_object_header(
402 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
403 ) -> "ShaFile":
404 """Parse a legacy object, creating it but not reading the file."""
405 bufsize = 1024
406 decomp = zlib.decompressobj()
407 header = decomp.decompress(magic)
408 start = 0
409 end = -1
410 while end < 0:
411 extra = f.read(bufsize)
412 header += decomp.decompress(extra)
413 magic += extra
414 end = header.find(b"\0", start)
415 start = len(header)
416 header = header[:end]
417 type_name, size = header.split(b" ", 1)
418 try:
419 int(size) # sanity check
420 except ValueError as exc:
421 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc
422 obj_class = object_class(type_name)
423 if not obj_class:
424 raise ObjectFormatException(
425 "Not a known type: {}".format(type_name.decode("ascii"))
426 )
427 return obj_class()
429 def _parse_legacy_object(self, map: bytes) -> None:
430 """Parse a legacy object, setting the raw string."""
431 text = _decompress(map)
432 header_end = text.find(b"\0")
433 if header_end < 0:
434 raise ObjectFormatException("Invalid object header, no \\0")
435 self.set_raw_string(text[header_end + 1 :])
437 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
438 """Return chunks representing the object in the experimental format.
440 Returns: List of strings
441 """
442 compobj = zlib.compressobj(compression_level)
443 yield compobj.compress(self._header())
444 for chunk in self.as_raw_chunks():
445 yield compobj.compress(chunk)
446 yield compobj.flush()
448 def as_legacy_object(self, compression_level: int = -1) -> bytes:
449 """Return string representing the object in the experimental format."""
450 return b"".join(
451 self.as_legacy_object_chunks(compression_level=compression_level)
452 )
454 def as_raw_chunks(self) -> list[bytes]:
455 """Return chunks with serialization of the object.
457 Returns: List of strings, not necessarily one per line
458 """
459 if self._needs_serialization:
460 self._sha = None
461 self._chunked_text = self._serialize()
462 self._needs_serialization = False
463 assert self._chunked_text is not None
464 return self._chunked_text
466 def as_raw_string(self) -> bytes:
467 """Return raw string with serialization of the object.
469 Returns: String object
470 """
471 return b"".join(self.as_raw_chunks())
473 def __bytes__(self) -> bytes:
474 """Return raw string serialization of this object."""
475 return self.as_raw_string()
477 def __hash__(self) -> int:
478 """Return unique hash for this object."""
479 return hash(self.id)
481 def as_pretty_string(self) -> str:
482 """Return a string representing this object, fit for display."""
483 return self.as_raw_string().decode("utf-8", "replace")
485 def set_raw_string(self, text: bytes, sha: ObjectID | None = None) -> None:
486 """Set the contents of this object from a serialized string."""
487 if not isinstance(text, bytes):
488 raise TypeError(f"Expected bytes for text, got {text!r}")
489 self.set_raw_chunks([text], sha)
491 def set_raw_chunks(self, chunks: list[bytes], sha: ObjectID | None = None) -> None:
492 """Set the contents of this object from a list of chunks."""
493 self._chunked_text = chunks
494 self._deserialize(chunks)
495 if sha is None:
496 self._sha = None
497 else:
498 self._sha = FixedSha(sha)
499 self._needs_serialization = False
501 @staticmethod
502 def _parse_object_header(
503 magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
504 ) -> "ShaFile":
505 """Parse a new style object, creating it but not reading the file."""
506 num_type = (ord(magic[0:1]) >> 4) & 7
507 obj_class = object_class(num_type)
508 if not obj_class:
509 raise ObjectFormatException(f"Not a known type {num_type}")
510 return obj_class()
512 def _parse_object(self, map: bytes) -> None:
513 """Parse a new style object, setting self._text."""
514 # skip type and size; type must have already been determined, and
515 # we trust zlib to fail if it's otherwise corrupted
516 byte = ord(map[0:1])
517 used = 1
518 while (byte & 0x80) != 0:
519 byte = ord(map[used : used + 1])
520 used += 1
521 raw = map[used:]
522 self.set_raw_string(_decompress(raw))
524 @classmethod
525 def _is_legacy_object(cls, magic: bytes) -> bool:
526 b0 = ord(magic[0:1])
527 b1 = ord(magic[1:2])
528 word = (b0 << 8) + b1
529 return (b0 & 0x8F) == 0x08 and (word % 31) == 0
531 @classmethod
532 def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
533 map = f.read()
534 if not map:
535 raise EmptyFileException("Corrupted empty file detected")
537 if cls._is_legacy_object(map):
538 obj = cls._parse_legacy_object_header(map, f)
539 obj._parse_legacy_object(map)
540 else:
541 obj = cls._parse_object_header(map, f)
542 obj._parse_object(map)
543 return obj
545 def __init__(self) -> None:
546 """Don't call this directly."""
547 self._sha = None
548 self._chunked_text = []
549 self._needs_serialization = True
551 def _deserialize(self, chunks: list[bytes]) -> None:
552 raise NotImplementedError(self._deserialize)
554 def _serialize(self) -> list[bytes]:
555 raise NotImplementedError(self._serialize)
557 @classmethod
558 def from_path(cls, path: str | bytes) -> "ShaFile":
559 """Open a SHA file from disk."""
560 with GitFile(path, "rb") as f:
561 return cls.from_file(f)
563 @classmethod
564 def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
565 """Get the contents of a SHA file on disk."""
566 try:
567 obj = cls._parse_file(f)
568 obj._sha = None
569 return obj
570 except (IndexError, ValueError) as exc:
571 raise ObjectFormatException("invalid object header") from exc
573 @staticmethod
574 def from_raw_string(
575 type_num: int, string: bytes, sha: ObjectID | None = None
576 ) -> "ShaFile":
577 """Creates an object of the indicated type from the raw string given.
579 Args:
580 type_num: The numeric type of the object.
581 string: The raw uncompressed contents.
582 sha: Optional known sha for the object
583 """
584 cls = object_class(type_num)
585 if cls is None:
586 raise AssertionError(f"unsupported class type num: {type_num}")
587 obj = cls()
588 obj.set_raw_string(string, sha)
589 return obj
591 @staticmethod
592 def from_raw_chunks(
593 type_num: int, chunks: list[bytes], sha: ObjectID | None = None
594 ) -> "ShaFile":
595 """Creates an object of the indicated type from the raw chunks given.
597 Args:
598 type_num: The numeric type of the object.
599 chunks: An iterable of the raw uncompressed contents.
600 sha: Optional known sha for the object
601 """
602 cls = object_class(type_num)
603 if cls is None:
604 raise AssertionError(f"unsupported class type num: {type_num}")
605 obj = cls()
606 obj.set_raw_chunks(chunks, sha)
607 return obj
609 @classmethod
610 def from_string(cls, string: bytes) -> Self:
611 """Create a ShaFile from a string."""
612 obj = cls()
613 obj.set_raw_string(string)
614 return obj
616 def _check_has_member(self, member: str, error_msg: str) -> None:
617 """Check that the object has a given member variable.
619 Args:
620 member: the member variable to check for
621 error_msg: the message for an error if the member is missing
622 Raises:
623 ObjectFormatException: with the given error_msg if member is
624 missing or is None
625 """
626 if getattr(self, member, None) is None:
627 raise ObjectFormatException(error_msg)
629 def check(self) -> None:
630 """Check this object for internal consistency.
632 Raises:
633 ObjectFormatException: if the object is malformed in some way
634 ChecksumMismatch: if the object was created with a SHA that does
635 not match its contents
636 """
637 # TODO: if we find that error-checking during object parsing is a
638 # performance bottleneck, those checks should be moved to the class's
639 # check() method during optimization so we can still check the object
640 # when necessary.
641 old_sha = self.id
642 try:
643 self._deserialize(self.as_raw_chunks())
644 self._sha = None
645 new_sha = self.id
646 except Exception as exc:
647 raise ObjectFormatException(exc) from exc
648 if old_sha != new_sha:
649 raise ChecksumMismatch(new_sha, old_sha)
651 def _header(self) -> bytes:
652 return object_header(self.type_num, self.raw_length())
654 def raw_length(self) -> int:
655 """Returns the length of the raw string of this object."""
656 return sum(map(len, self.as_raw_chunks()))
658 def sha(self) -> Union[FixedSha, "HASH"]:
659 """The SHA1 object that is the name of this object."""
660 if self._sha is None or self._needs_serialization:
661 # this is a local because as_raw_chunks() overwrites self._sha
662 new_sha = sha1()
663 new_sha.update(self._header())
664 for chunk in self.as_raw_chunks():
665 new_sha.update(chunk)
666 self._sha = new_sha
667 return self._sha
669 def copy(self) -> "ShaFile":
670 """Create a new copy of this SHA1 object from its raw string."""
671 obj_class = object_class(self.type_num)
672 if obj_class is None:
673 raise AssertionError(f"invalid type num {self.type_num}")
674 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
676 @property
677 def id(self) -> bytes:
678 """The hex SHA of this object."""
679 return self.sha().hexdigest().encode("ascii")
681 def __repr__(self) -> str:
682 """Return string representation of this object."""
683 return f"<{self.__class__.__name__} {self.id!r}>"
685 def __ne__(self, other: object) -> bool:
686 """Check whether this object does not match the other."""
687 return not isinstance(other, ShaFile) or self.id != other.id
689 def __eq__(self, other: object) -> bool:
690 """Return True if the SHAs of the two objects match."""
691 return isinstance(other, ShaFile) and self.id == other.id
693 def __lt__(self, other: object) -> bool:
694 """Return whether SHA of this object is less than the other."""
695 if not isinstance(other, ShaFile):
696 raise TypeError
697 return self.id < other.id
699 def __le__(self, other: object) -> bool:
700 """Check whether SHA of this object is less than or equal to the other."""
701 if not isinstance(other, ShaFile):
702 raise TypeError
703 return self.id <= other.id
706class Blob(ShaFile):
707 """A Git Blob object."""
709 __slots__ = ()
711 type_name = b"blob"
712 type_num = 3
714 _chunked_text: list[bytes]
716 def __init__(self) -> None:
717 """Initialize a new Blob object."""
718 super().__init__()
719 self._chunked_text = []
720 self._needs_serialization = False
722 def _get_data(self) -> bytes:
723 return self.as_raw_string()
725 def _set_data(self, data: bytes) -> None:
726 self.set_raw_string(data)
728 data = property(
729 _get_data, _set_data, doc="The text contained within the blob object."
730 )
732 def _get_chunked(self) -> list[bytes]:
733 return self._chunked_text
735 def _set_chunked(self, chunks: list[bytes]) -> None:
736 self._chunked_text = chunks
738 def _serialize(self) -> list[bytes]:
739 return self._chunked_text
741 def _deserialize(self, chunks: list[bytes]) -> None:
742 self._chunked_text = chunks
744 chunked = property(
745 _get_chunked,
746 _set_chunked,
747 doc="The text in the blob object, as chunks (not necessarily lines)",
748 )
750 @classmethod
751 def from_path(cls, path: str | bytes) -> "Blob":
752 """Read a blob from a file on disk.
754 Args:
755 path: Path to the blob file
757 Returns:
758 A Blob object
760 Raises:
761 NotBlobError: If the file is not a blob
762 """
763 blob = ShaFile.from_path(path)
764 if not isinstance(blob, cls):
765 raise NotBlobError(_path_to_bytes(path))
766 return blob
768 def check(self) -> None:
769 """Check this object for internal consistency.
771 Raises:
772 ObjectFormatException: if the object is malformed in some way
773 """
774 super().check()
776 def splitlines(self) -> list[bytes]:
777 """Return list of lines in this blob.
779 This preserves the original line endings.
780 """
781 chunks = self.chunked
782 if not chunks:
783 return []
784 if len(chunks) == 1:
785 result: list[bytes] = chunks[0].splitlines(True)
786 return result
787 remaining = None
788 ret = []
789 for chunk in chunks:
790 lines = chunk.splitlines(True)
791 if len(lines) > 1:
792 ret.append((remaining or b"") + lines[0])
793 ret.extend(lines[1:-1])
794 remaining = lines[-1]
795 elif len(lines) == 1:
796 if remaining is None:
797 remaining = lines.pop()
798 else:
799 remaining += lines.pop()
800 if remaining is not None:
801 ret.append(remaining)
802 return ret
805def _parse_message(
806 chunks: Iterable[bytes],
807) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]:
808 """Parse a message with a list of fields and a body.
810 Args:
811 chunks: the raw chunks of the tag or commit object.
812 Returns: iterator of tuples of (field, value), one per header line, in the
813 order read from the text, possibly including duplicates. Includes a
814 field named None for the freeform tag/commit text.
815 """
816 f = BytesIO(b"".join(chunks))
817 k = None
818 v = b""
819 eof = False
821 def _strip_last_newline(value: bytes) -> bytes:
822 """Strip the last newline from value."""
823 if value and value.endswith(b"\n"):
824 return value[:-1]
825 return value
827 # Parse the headers
828 #
829 # Headers can contain newlines. The next line is indented with a space.
830 # We store the latest key as 'k', and the accumulated value as 'v'.
831 for line in f:
832 if line.startswith(b" "):
833 # Indented continuation of the previous line
834 v += line[1:]
835 else:
836 if k is not None:
837 # We parsed a new header, return its value
838 yield (k, _strip_last_newline(v))
839 if line == b"\n":
840 # Empty line indicates end of headers
841 break
842 (k, v) = line.split(b" ", 1)
844 else:
845 # We reached end of file before the headers ended. We still need to
846 # return the previous header, then we need to return a None field for
847 # the text.
848 eof = True
849 if k is not None:
850 yield (k, _strip_last_newline(v))
851 yield (None, None)
853 if not eof:
854 # We didn't reach the end of file while parsing headers. We can return
855 # the rest of the file as a message.
856 yield (None, f.read())
858 f.close()
861def _format_message(
862 headers: Sequence[tuple[bytes, bytes]], body: bytes | None
863) -> Iterator[bytes]:
864 for field, value in headers:
865 lines = value.split(b"\n")
866 yield git_line(field, lines[0])
867 for line in lines[1:]:
868 yield b" " + line + b"\n"
869 yield b"\n" # There must be a new line after the headers
870 if body:
871 yield body
874class Tag(ShaFile):
875 """A Git Tag object."""
877 type_name = b"tag"
878 type_num = 4
880 __slots__ = (
881 "_message",
882 "_name",
883 "_object_class",
884 "_object_sha",
885 "_signature",
886 "_tag_time",
887 "_tag_timezone",
888 "_tag_timezone_neg_utc",
889 "_tagger",
890 )
892 _message: bytes | None
893 _name: bytes | None
894 _object_class: type["ShaFile"] | None
895 _object_sha: bytes | None
896 _signature: bytes | None
897 _tag_time: int | None
898 _tag_timezone: int | None
899 _tag_timezone_neg_utc: bool | None
900 _tagger: bytes | None
902 def __init__(self) -> None:
903 """Initialize a new Tag object."""
904 super().__init__()
905 self._tagger = None
906 self._tag_time = None
907 self._tag_timezone = None
908 self._tag_timezone_neg_utc = False
909 self._signature: bytes | None = None
911 @classmethod
912 def from_path(cls, filename: str | bytes) -> "Tag":
913 """Read a tag from a file on disk.
915 Args:
916 filename: Path to the tag file
918 Returns:
919 A Tag object
921 Raises:
922 NotTagError: If the file is not a tag
923 """
924 tag = ShaFile.from_path(filename)
925 if not isinstance(tag, cls):
926 raise NotTagError(_path_to_bytes(filename))
927 return tag
929 def check(self) -> None:
930 """Check this object for internal consistency.
932 Raises:
933 ObjectFormatException: if the object is malformed in some way
934 """
935 super().check()
936 assert self._chunked_text is not None
937 self._check_has_member("_object_sha", "missing object sha")
938 self._check_has_member("_object_class", "missing object type")
939 self._check_has_member("_name", "missing tag name")
941 if not self._name:
942 raise ObjectFormatException("empty tag name")
944 if self._object_sha is None:
945 raise ObjectFormatException("missing object sha")
946 check_hexsha(self._object_sha, "invalid object sha")
948 if self._tagger is not None:
949 check_identity(self._tagger, "invalid tagger")
951 self._check_has_member("_tag_time", "missing tag time")
952 if self._tag_time is None:
953 raise ObjectFormatException("missing tag time")
954 check_time(self._tag_time)
956 last = None
957 for field, _ in _parse_message(self._chunked_text):
958 if field == _OBJECT_HEADER and last is not None:
959 raise ObjectFormatException("unexpected object")
960 elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
961 raise ObjectFormatException("unexpected type")
962 elif field == _TAG_HEADER and last != _TYPE_HEADER:
963 raise ObjectFormatException("unexpected tag name")
964 elif field == _TAGGER_HEADER and last != _TAG_HEADER:
965 raise ObjectFormatException("unexpected tagger")
966 last = field
968 def _serialize(self) -> list[bytes]:
969 headers = []
970 if self._object_sha is None:
971 raise ObjectFormatException("missing object sha")
972 headers.append((_OBJECT_HEADER, self._object_sha))
973 if self._object_class is None:
974 raise ObjectFormatException("missing object class")
975 headers.append((_TYPE_HEADER, self._object_class.type_name))
976 if self._name is None:
977 raise ObjectFormatException("missing tag name")
978 headers.append((_TAG_HEADER, self._name))
979 if self._tagger:
980 if self._tag_time is None:
981 headers.append((_TAGGER_HEADER, self._tagger))
982 else:
983 if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
984 raise ObjectFormatException("missing timezone info")
985 headers.append(
986 (
987 _TAGGER_HEADER,
988 format_time_entry(
989 self._tagger,
990 self._tag_time,
991 (self._tag_timezone, self._tag_timezone_neg_utc),
992 ),
993 )
994 )
996 if self.message is None and self._signature is None:
997 body = None
998 else:
999 body = (self.message or b"") + (self._signature or b"")
1000 return list(_format_message(headers, body))
1002 def _deserialize(self, chunks: list[bytes]) -> None:
1003 """Grab the metadata attached to the tag."""
1004 self._tagger = None
1005 self._tag_time = None
1006 self._tag_timezone = None
1007 self._tag_timezone_neg_utc = False
1008 for field, value in _parse_message(chunks):
1009 if field == _OBJECT_HEADER:
1010 self._object_sha = value
1011 elif field == _TYPE_HEADER:
1012 assert isinstance(value, bytes)
1013 obj_class = object_class(value)
1014 if not obj_class:
1015 raise ObjectFormatException(f"Not a known type: {value!r}")
1016 self._object_class = obj_class
1017 elif field == _TAG_HEADER:
1018 self._name = value
1019 elif field == _TAGGER_HEADER:
1020 if value is None:
1021 raise ObjectFormatException("missing tagger value")
1022 (
1023 self._tagger,
1024 self._tag_time,
1025 (self._tag_timezone, self._tag_timezone_neg_utc),
1026 ) = parse_time_entry(value)
1027 elif field is None:
1028 if value is None:
1029 self._message = None
1030 self._signature = None
1031 else:
1032 # Try to find either PGP or SSH signature
1033 sig_idx = None
1034 try:
1035 sig_idx = value.index(BEGIN_PGP_SIGNATURE)
1036 except ValueError:
1037 try:
1038 sig_idx = value.index(BEGIN_SSH_SIGNATURE)
1039 except ValueError:
1040 pass
1042 if sig_idx is not None:
1043 self._message = value[:sig_idx]
1044 self._signature = value[sig_idx:]
1045 else:
1046 self._message = value
1047 self._signature = None
1048 else:
1049 raise ObjectFormatException(
1050 f"Unknown field {field.decode('ascii', 'replace')}"
1051 )
1053 def _get_object(self) -> tuple[type[ShaFile], bytes]:
1054 """Get the object pointed to by this tag.
1056 Returns: tuple of (object class, sha).
1057 """
1058 if self._object_class is None or self._object_sha is None:
1059 raise ValueError("Tag object is not properly initialized")
1060 return (self._object_class, self._object_sha)
1062 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
1063 (self._object_class, self._object_sha) = value
1064 self._needs_serialization = True
1066 object = property(_get_object, _set_object)
1068 name = serializable_property("name", "The name of this tag")
1069 tagger = serializable_property(
1070 "tagger", "Returns the name of the person who created this tag"
1071 )
1072 tag_time = serializable_property(
1073 "tag_time",
1074 "The creation timestamp of the tag. As the number of seconds since the epoch",
1075 )
1076 tag_timezone = serializable_property(
1077 "tag_timezone", "The timezone that tag_time is in."
1078 )
1079 message = serializable_property("message", "the message attached to this tag")
1081 signature = serializable_property("signature", "Optional detached GPG signature")
1083 def sign(self, keyid: str | None = None) -> None:
1084 """Sign this tag with a GPG key.
1086 Args:
1087 keyid: Optional GPG key ID to use for signing. If not specified,
1088 the default GPG key will be used.
1089 """
1090 import gpg
1092 with gpg.Context(armor=True) as c:
1093 if keyid is not None:
1094 key = c.get_key(keyid)
1095 with gpg.Context(armor=True, signers=[key]) as ctx:
1096 self.signature, _unused_result = ctx.sign(
1097 self.as_raw_string(),
1098 mode=gpg.constants.sig.mode.DETACH,
1099 )
1100 else:
1101 self.signature, _unused_result = c.sign(
1102 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1103 )
1105 def raw_without_sig(self) -> bytes:
1106 """Return raw string serialization without the GPG/SSH signature.
1108 self.signature is a signature for the returned raw byte string serialization.
1109 """
1110 ret = self.as_raw_string()
1111 if self._signature:
1112 ret = ret[: -len(self._signature)]
1113 return ret
1115 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]:
1116 """Extract the payload, signature, and signature type from this tag.
1118 Returns:
1119 Tuple of (``payload``, ``signature``, ``signature_type``) where:
1121 - ``payload``: The raw tag data without the signature
1122 - ``signature``: The signature bytes if present, None otherwise
1123 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
1125 Raises:
1126 ObjectFormatException: If signature has unknown format
1127 """
1128 if self._signature is None:
1129 return self.as_raw_string(), None, None
1131 payload = self.raw_without_sig()
1133 # Determine signature type
1134 if self._signature.startswith(BEGIN_PGP_SIGNATURE):
1135 sig_type = SIGNATURE_PGP
1136 elif self._signature.startswith(BEGIN_SSH_SIGNATURE):
1137 sig_type = SIGNATURE_SSH
1138 else:
1139 raise ObjectFormatException("Unknown signature format")
1141 return payload, self._signature, sig_type
1143 def verify(self, keyids: Iterable[str] | None = None) -> None:
1144 """Verify GPG signature for this tag (if it is signed).
1146 Args:
1147 keyids: Optional iterable of trusted keyids for this tag.
1148 If this tag is not signed by any key in keyids verification will
1149 fail. If not specified, this function only verifies that the tag
1150 has a valid signature.
1152 Raises:
1153 gpg.errors.BadSignatures: if GPG signature verification fails
1154 gpg.errors.MissingSignatures: if tag was not signed by a key
1155 specified in keyids
1156 """
1157 if self._signature is None:
1158 return
1160 import gpg
1162 with gpg.Context() as ctx:
1163 data, result = ctx.verify(
1164 self.raw_without_sig(),
1165 signature=self._signature,
1166 )
1167 if keyids:
1168 keys = [ctx.get_key(key) for key in keyids]
1169 for key in keys:
1170 for subkey in key.subkeys:
1171 for sig in result.signatures:
1172 if subkey.can_sign and subkey.fpr == sig.fpr:
1173 return
1174 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1177class TreeEntry(NamedTuple):
1178 """Named tuple encapsulating a single tree entry."""
1180 path: bytes
1181 mode: int
1182 sha: bytes
1184 def in_path(self, path: bytes) -> "TreeEntry":
1185 """Return a copy of this entry with the given path prepended."""
1186 if not isinstance(self.path, bytes):
1187 raise TypeError(f"Expected bytes for path, got {path!r}")
1188 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
1191def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]:
1192 """Parse a tree text.
1194 Args:
1195 text: Serialized text to parse
1196 strict: If True, enforce strict validation
1197 Returns: iterator of tuples of (name, mode, sha)
1199 Raises:
1200 ObjectFormatException: if the object was malformed in some way
1201 """
1202 count = 0
1203 length = len(text)
1204 while count < length:
1205 mode_end = text.index(b" ", count)
1206 mode_text = text[count:mode_end]
1207 if strict and mode_text.startswith(b"0"):
1208 raise ObjectFormatException(f"Invalid mode {mode_text!r}")
1209 try:
1210 mode = int(mode_text, 8)
1211 except ValueError as exc:
1212 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
1213 name_end = text.index(b"\0", mode_end)
1214 name = text[mode_end + 1 : name_end]
1215 count = name_end + 21
1216 sha = text[name_end + 1 : count]
1217 if len(sha) != 20:
1218 raise ObjectFormatException("Sha has invalid length")
1219 hexsha = sha_to_hex(sha)
1220 yield (name, mode, hexsha)
1223def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]:
1224 """Serialize the items in a tree to a text.
1226 Args:
1227 items: Sorted iterable over (name, mode, sha) tuples
1228 Returns: Serialized tree text as chunks
1229 """
1230 for name, mode, hexsha in items:
1231 yield (
1232 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha)
1233 )
1236def sorted_tree_items(
1237 entries: dict[bytes, tuple[int, bytes]], name_order: bool
1238) -> Iterator[TreeEntry]:
1239 """Iterate over a tree entries dictionary.
1241 Args:
1242 name_order: If True, iterate entries in order of their name. If
1243 False, iterate entries in tree order, that is, treat subtree entries as
1244 having '/' appended.
1245 entries: Dictionary mapping names to (mode, sha) tuples
1246 Returns: Iterator over (name, mode, hexsha)
1247 """
1248 if name_order:
1249 key_func = key_entry_name_order
1250 else:
1251 key_func = key_entry
1252 for name, entry in sorted(entries.items(), key=key_func):
1253 mode, hexsha = entry
1254 # Stricter type checks than normal to mirror checks in the Rust version.
1255 mode = int(mode)
1256 if not isinstance(hexsha, bytes):
1257 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}")
1258 yield TreeEntry(name, mode, hexsha)
1261def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1262 """Sort key for tree entry.
1264 Args:
1265 entry: (name, value) tuple
1266 """
1267 (name, (mode, _sha)) = entry
1268 if stat.S_ISDIR(mode):
1269 name += b"/"
1270 return name
1273def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1274 """Sort key for tree entry in name order."""
1275 return entry[0]
1278def pretty_format_tree_entry(
1279 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8"
1280) -> str:
1281 """Pretty format tree entry.
1283 Args:
1284 name: Name of the directory entry
1285 mode: Mode of entry
1286 hexsha: Hexsha of the referenced object
1287 encoding: Character encoding for the name
1288 Returns: string describing the tree entry
1289 """
1290 if mode & stat.S_IFDIR:
1291 kind = "tree"
1292 else:
1293 kind = "blob"
1294 return "{:04o} {} {}\t{}\n".format(
1295 mode,
1296 kind,
1297 hexsha.decode("ascii"),
1298 name.decode(encoding, "replace"),
1299 )
1302class SubmoduleEncountered(Exception):
1303 """A submodule was encountered while resolving a path."""
1305 def __init__(self, path: bytes, sha: ObjectID) -> None:
1306 """Initialize SubmoduleEncountered exception.
1308 Args:
1309 path: Path where the submodule was encountered
1310 sha: SHA of the submodule
1311 """
1312 self.path = path
1313 self.sha = sha
1316class Tree(ShaFile):
1317 """A Git tree object."""
1319 type_name = b"tree"
1320 type_num = 2
1322 __slots__ = "_entries"
1324 def __init__(self) -> None:
1325 """Initialize an empty Tree."""
1326 super().__init__()
1327 self._entries: dict[bytes, tuple[int, bytes]] = {}
1329 @classmethod
1330 def from_path(cls, filename: str | bytes) -> "Tree":
1331 """Read a tree from a file on disk.
1333 Args:
1334 filename: Path to the tree file
1336 Returns:
1337 A Tree object
1339 Raises:
1340 NotTreeError: If the file is not a tree
1341 """
1342 tree = ShaFile.from_path(filename)
1343 if not isinstance(tree, cls):
1344 raise NotTreeError(_path_to_bytes(filename))
1345 return tree
1347 def __contains__(self, name: bytes) -> bool:
1348 """Check if name exists in tree."""
1349 return name in self._entries
1351 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
1352 """Get tree entry by name."""
1353 return self._entries[name]
1355 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
1356 """Set a tree entry by name.
1358 Args:
1359 name: The name of the entry, as a string.
1360 value: A tuple of (mode, hexsha), where mode is the mode of the
1361 entry as an integral type and hexsha is the hex SHA of the entry as
1362 a string.
1363 """
1364 mode, hexsha = value
1365 self._entries[name] = (mode, hexsha)
1366 self._needs_serialization = True
1368 def __delitem__(self, name: bytes) -> None:
1369 """Delete tree entry by name."""
1370 del self._entries[name]
1371 self._needs_serialization = True
1373 def __len__(self) -> int:
1374 """Return number of entries in tree."""
1375 return len(self._entries)
1377 def __iter__(self) -> Iterator[bytes]:
1378 """Iterate over tree entry names."""
1379 return iter(self._entries)
1381 def add(self, name: bytes, mode: int, hexsha: bytes) -> None:
1382 """Add an entry to the tree.
1384 Args:
1385 mode: The mode of the entry as an integral type. Not all
1386 possible modes are supported by git; see check() for details.
1387 name: The name of the entry, as a string.
1388 hexsha: The hex SHA of the entry as a string.
1389 """
1390 self._entries[name] = mode, hexsha
1391 self._needs_serialization = True
1393 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
1394 """Iterate over entries.
1396 Args:
1397 name_order: If True, iterate in name order instead of tree
1398 order.
1399 Returns: Iterator over (name, mode, sha) tuples
1400 """
1401 return sorted_tree_items(self._entries, name_order)
1403 def items(self) -> list[TreeEntry]:
1404 """Return the sorted entries in this tree.
1406 Returns: List with (name, mode, sha) tuples
1407 """
1408 return list(self.iteritems())
1410 def _deserialize(self, chunks: list[bytes]) -> None:
1411 """Grab the entries in the tree."""
1412 try:
1413 parsed_entries = parse_tree(b"".join(chunks))
1414 except ValueError as exc:
1415 raise ObjectFormatException(exc) from exc
1416 # TODO: list comprehension is for efficiency in the common (small)
1417 # case; if memory efficiency in the large case is a concern, use a
1418 # genexp.
1419 self._entries = {n: (m, s) for n, m, s in parsed_entries}
1421 def check(self) -> None:
1422 """Check this object for internal consistency.
1424 Raises:
1425 ObjectFormatException: if the object is malformed in some way
1426 """
1427 super().check()
1428 assert self._chunked_text is not None
1429 last = None
1430 allowed_modes = (
1431 stat.S_IFREG | 0o755,
1432 stat.S_IFREG | 0o644,
1433 stat.S_IFLNK,
1434 stat.S_IFDIR,
1435 S_IFGITLINK,
1436 # TODO: optionally exclude as in git fsck --strict
1437 stat.S_IFREG | 0o664,
1438 )
1439 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True):
1440 check_hexsha(sha, f"invalid sha {sha!r}")
1441 if b"/" in name or name in (b"", b".", b"..", b".git"):
1442 raise ObjectFormatException(
1443 "invalid name {}".format(name.decode("utf-8", "replace"))
1444 )
1446 if mode not in allowed_modes:
1447 raise ObjectFormatException(f"invalid mode {mode:06o}")
1449 entry = (name, (mode, sha))
1450 if last:
1451 if key_entry(last) > key_entry(entry):
1452 raise ObjectFormatException("entries not sorted")
1453 if name == last[0]:
1454 raise ObjectFormatException(f"duplicate entry {name!r}")
1455 last = entry
1457 def _serialize(self) -> list[bytes]:
1458 return list(serialize_tree(self.iteritems()))
1460 def as_pretty_string(self) -> str:
1461 """Return a human-readable string representation of this tree.
1463 Returns:
1464 Pretty-printed tree entries
1465 """
1466 text: list[str] = []
1467 for entry in self.iteritems():
1468 if (
1469 entry.path is not None
1470 and entry.mode is not None
1471 and entry.sha is not None
1472 ):
1473 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha))
1474 return "".join(text)
1476 def lookup_path(
1477 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
1478 ) -> tuple[int, ObjectID]:
1479 """Look up an object in a Git tree.
1481 Args:
1482 lookup_obj: Callback for retrieving object by SHA1
1483 path: Path to lookup
1484 Returns: A tuple of (mode, SHA) of the resulting path.
1485 """
1486 # Handle empty path - return the tree itself
1487 if not path:
1488 return stat.S_IFDIR, self.id
1490 parts = path.split(b"/")
1491 sha = self.id
1492 mode: int | None = None
1493 for i, p in enumerate(parts):
1494 if not p:
1495 continue
1496 if mode is not None and S_ISGITLINK(mode):
1497 raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
1498 obj = lookup_obj(sha)
1499 if not isinstance(obj, Tree):
1500 raise NotTreeError(sha)
1501 mode, sha = obj[p]
1502 if mode is None:
1503 raise ValueError("No valid path found")
1504 return mode, sha
1507def parse_timezone(text: bytes) -> tuple[int, bool]:
1508 """Parse a timezone text fragment (e.g. '+0100').
1510 Args:
1511 text: Text to parse.
1512 Returns: Tuple with timezone as seconds difference to UTC
1513 and a boolean indicating whether this was a UTC timezone
1514 prefixed with a negative sign (-0000).
1515 """
1516 # cgit parses the first character as the sign, and the rest
1517 # as an integer (using strtol), which could also be negative.
1518 # We do the same for compatibility. See #697828.
1519 if text[0] not in b"+-":
1520 raise ValueError("Timezone must start with + or - ({text})".format(**vars()))
1521 sign = text[:1]
1522 offset = int(text[1:])
1523 if sign == b"-":
1524 offset = -offset
1525 unnecessary_negative_timezone = offset >= 0 and sign == b"-"
1526 signum = ((offset < 0) and -1) or 1
1527 offset = abs(offset)
1528 hours = int(offset / 100)
1529 minutes = offset % 100
1530 return (
1531 signum * (hours * 3600 + minutes * 60),
1532 unnecessary_negative_timezone,
1533 )
1536def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
1537 """Format a timezone for Git serialization.
1539 Args:
1540 offset: Timezone offset as seconds difference to UTC
1541 unnecessary_negative_timezone: Whether to use a minus sign for
1542 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700).
1543 """
1544 if offset % 60 != 0:
1545 raise ValueError("Unable to handle non-minute offset.")
1546 if offset < 0 or unnecessary_negative_timezone:
1547 sign = "-"
1548 offset = -offset
1549 else:
1550 sign = "+"
1551 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
1554def parse_time_entry(
1555 value: bytes,
1556) -> tuple[bytes, int | None, tuple[int | None, bool]]:
1557 """Parse event.
1559 Args:
1560 value: Bytes representing a git commit/tag line
1561 Raises:
1562 ObjectFormatException in case of parsing error (malformed
1563 field date)
1564 Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
1565 """
1566 try:
1567 sep = value.rindex(b"> ")
1568 except ValueError:
1569 return (value, None, (None, False))
1570 try:
1571 person = value[0 : sep + 1]
1572 rest = value[sep + 2 :]
1573 timetext, timezonetext = rest.rsplit(b" ", 1)
1574 time = int(timetext)
1575 timezone, timezone_neg_utc = parse_timezone(timezonetext)
1576 except ValueError as exc:
1577 raise ObjectFormatException(exc) from exc
1578 return person, time, (timezone, timezone_neg_utc)
1581def format_time_entry(
1582 person: bytes, time: int, timezone_info: tuple[int, bool]
1583) -> bytes:
1584 """Format an event."""
1585 (timezone, timezone_neg_utc) = timezone_info
1586 return b" ".join(
1587 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
1588 )
1591@replace_me(since="0.21.0", remove_in="0.24.0")
1592def parse_commit(
1593 chunks: Iterable[bytes],
1594) -> tuple[
1595 bytes | None,
1596 list[bytes],
1597 tuple[bytes | None, int | None, tuple[int | None, bool | None]],
1598 tuple[bytes | None, int | None, tuple[int | None, bool | None]],
1599 bytes | None,
1600 list[Tag],
1601 bytes | None,
1602 bytes | None,
1603 list[tuple[bytes, bytes]],
1604]:
1605 """Parse a commit object from chunks.
1607 Args:
1608 chunks: Chunks to parse
1609 Returns: Tuple of (tree, parents, author_info, commit_info,
1610 encoding, mergetag, gpgsig, message, extra)
1611 """
1612 parents = []
1613 extra = []
1614 tree = None
1615 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1616 None,
1617 None,
1618 (None, None),
1619 )
1620 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1621 None,
1622 None,
1623 (None, None),
1624 )
1625 encoding = None
1626 mergetag = []
1627 message = None
1628 gpgsig = None
1630 for field, value in _parse_message(chunks):
1631 # TODO(jelmer): Enforce ordering
1632 if field == _TREE_HEADER:
1633 tree = value
1634 elif field == _PARENT_HEADER:
1635 if value is None:
1636 raise ObjectFormatException("missing parent value")
1637 parents.append(value)
1638 elif field == _AUTHOR_HEADER:
1639 if value is None:
1640 raise ObjectFormatException("missing author value")
1641 author_info = parse_time_entry(value)
1642 elif field == _COMMITTER_HEADER:
1643 if value is None:
1644 raise ObjectFormatException("missing committer value")
1645 commit_info = parse_time_entry(value)
1646 elif field == _ENCODING_HEADER:
1647 encoding = value
1648 elif field == _MERGETAG_HEADER:
1649 if value is None:
1650 raise ObjectFormatException("missing mergetag value")
1651 tag = Tag.from_string(value + b"\n")
1652 assert isinstance(tag, Tag)
1653 mergetag.append(tag)
1654 elif field == _GPGSIG_HEADER:
1655 gpgsig = value
1656 elif field is None:
1657 message = value
1658 else:
1659 if value is None:
1660 raise ObjectFormatException(f"missing value for field {field!r}")
1661 extra.append((field, value))
1662 return (
1663 tree,
1664 parents,
1665 author_info,
1666 commit_info,
1667 encoding,
1668 mergetag,
1669 gpgsig,
1670 message,
1671 extra,
1672 )
1675class Commit(ShaFile):
1676 """A git commit object."""
1678 type_name = b"commit"
1679 type_num = 1
1681 __slots__ = (
1682 "_author",
1683 "_author_time",
1684 "_author_timezone",
1685 "_author_timezone_neg_utc",
1686 "_commit_time",
1687 "_commit_timezone",
1688 "_commit_timezone_neg_utc",
1689 "_committer",
1690 "_encoding",
1691 "_extra",
1692 "_gpgsig",
1693 "_mergetag",
1694 "_message",
1695 "_parents",
1696 "_tree",
1697 )
1699 def __init__(self) -> None:
1700 """Initialize an empty Commit."""
1701 super().__init__()
1702 self._parents: list[bytes] = []
1703 self._encoding: bytes | None = None
1704 self._mergetag: list[Tag] = []
1705 self._gpgsig: bytes | None = None
1706 self._extra: list[tuple[bytes, bytes | None]] = []
1707 self._author_timezone_neg_utc: bool | None = False
1708 self._commit_timezone_neg_utc: bool | None = False
1710 @classmethod
1711 def from_path(cls, path: str | bytes) -> "Commit":
1712 """Read a commit from a file on disk.
1714 Args:
1715 path: Path to the commit file
1717 Returns:
1718 A Commit object
1720 Raises:
1721 NotCommitError: If the file is not a commit
1722 """
1723 commit = ShaFile.from_path(path)
1724 if not isinstance(commit, cls):
1725 raise NotCommitError(_path_to_bytes(path))
1726 return commit
1728 def _deserialize(self, chunks: list[bytes]) -> None:
1729 self._parents = []
1730 self._extra = []
1731 self._tree = None
1732 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1733 None,
1734 None,
1735 (None, None),
1736 )
1737 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1738 None,
1739 None,
1740 (None, None),
1741 )
1742 self._encoding = None
1743 self._mergetag = []
1744 self._message = None
1745 self._gpgsig = None
1747 for field, value in _parse_message(chunks):
1748 # TODO(jelmer): Enforce ordering
1749 if field == _TREE_HEADER:
1750 self._tree = value
1751 elif field == _PARENT_HEADER:
1752 assert value is not None
1753 self._parents.append(value)
1754 elif field == _AUTHOR_HEADER:
1755 if value is None:
1756 raise ObjectFormatException("missing author value")
1757 author_info = parse_time_entry(value)
1758 elif field == _COMMITTER_HEADER:
1759 if value is None:
1760 raise ObjectFormatException("missing committer value")
1761 commit_info = parse_time_entry(value)
1762 elif field == _ENCODING_HEADER:
1763 self._encoding = value
1764 elif field == _MERGETAG_HEADER:
1765 assert value is not None
1766 tag = Tag.from_string(value + b"\n")
1767 assert isinstance(tag, Tag)
1768 self._mergetag.append(tag)
1769 elif field == _GPGSIG_HEADER:
1770 self._gpgsig = value
1771 elif field is None:
1772 self._message = value
1773 else:
1774 self._extra.append((field, value))
1776 (
1777 self._author,
1778 self._author_time,
1779 (self._author_timezone, self._author_timezone_neg_utc),
1780 ) = author_info
1781 (
1782 self._committer,
1783 self._commit_time,
1784 (self._commit_timezone, self._commit_timezone_neg_utc),
1785 ) = commit_info
1787 def check(self) -> None:
1788 """Check this object for internal consistency.
1790 Raises:
1791 ObjectFormatException: if the object is malformed in some way
1792 """
1793 super().check()
1794 assert self._chunked_text is not None
1795 self._check_has_member("_tree", "missing tree")
1796 self._check_has_member("_author", "missing author")
1797 self._check_has_member("_committer", "missing committer")
1798 self._check_has_member("_author_time", "missing author time")
1799 self._check_has_member("_commit_time", "missing commit time")
1801 for parent in self._parents:
1802 check_hexsha(parent, "invalid parent sha")
1803 assert self._tree is not None # checked by _check_has_member above
1804 check_hexsha(self._tree, "invalid tree sha")
1806 assert self._author is not None # checked by _check_has_member above
1807 assert self._committer is not None # checked by _check_has_member above
1808 check_identity(self._author, "invalid author")
1809 check_identity(self._committer, "invalid committer")
1811 assert self._author_time is not None # checked by _check_has_member above
1812 assert self._commit_time is not None # checked by _check_has_member above
1813 check_time(self._author_time)
1814 check_time(self._commit_time)
1816 last = None
1817 for field, _ in _parse_message(self._chunked_text):
1818 if field == _TREE_HEADER and last is not None:
1819 raise ObjectFormatException("unexpected tree")
1820 elif field == _PARENT_HEADER and last not in (
1821 _PARENT_HEADER,
1822 _TREE_HEADER,
1823 ):
1824 raise ObjectFormatException("unexpected parent")
1825 elif field == _AUTHOR_HEADER and last not in (
1826 _TREE_HEADER,
1827 _PARENT_HEADER,
1828 ):
1829 raise ObjectFormatException("unexpected author")
1830 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER:
1831 raise ObjectFormatException("unexpected committer")
1832 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER:
1833 raise ObjectFormatException("unexpected encoding")
1834 last = field
1836 # TODO: optionally check for duplicate parents
1838 def sign(self, keyid: str | None = None) -> None:
1839 """Sign this commit with a GPG key.
1841 Args:
1842 keyid: Optional GPG key ID to use for signing. If not specified,
1843 the default GPG key will be used.
1844 """
1845 import gpg
1847 with gpg.Context(armor=True) as c:
1848 if keyid is not None:
1849 key = c.get_key(keyid)
1850 with gpg.Context(armor=True, signers=[key]) as ctx:
1851 self.gpgsig, _unused_result = ctx.sign(
1852 self.as_raw_string(),
1853 mode=gpg.constants.sig.mode.DETACH,
1854 )
1855 else:
1856 self.gpgsig, _unused_result = c.sign(
1857 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1858 )
1860 def raw_without_sig(self) -> bytes:
1861 """Return raw string serialization without the GPG/SSH signature.
1863 self.gpgsig is a signature for the returned raw byte string serialization.
1864 """
1865 tmp = self.copy()
1866 assert isinstance(tmp, Commit)
1867 tmp._gpgsig = None
1868 tmp.gpgsig = None
1869 return tmp.as_raw_string()
1871 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]:
1872 """Extract the payload, signature, and signature type from this commit.
1874 Returns:
1875 Tuple of (``payload``, ``signature``, ``signature_type``) where:
1877 - ``payload``: The raw commit data without the signature
1878 - ``signature``: The signature bytes if present, None otherwise
1879 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
1881 Raises:
1882 ObjectFormatException: If signature has unknown format
1883 """
1884 if self._gpgsig is None:
1885 return self.as_raw_string(), None, None
1887 payload = self.raw_without_sig()
1889 # Determine signature type
1890 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE):
1891 sig_type = SIGNATURE_PGP
1892 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE):
1893 sig_type = SIGNATURE_SSH
1894 else:
1895 raise ObjectFormatException("Unknown signature format")
1897 return payload, self._gpgsig, sig_type
1899 def verify(self, keyids: Iterable[str] | None = None) -> None:
1900 """Verify GPG signature for this commit (if it is signed).
1902 Args:
1903 keyids: Optional iterable of trusted keyids for this commit.
1904 If this commit is not signed by any key in keyids verification will
1905 fail. If not specified, this function only verifies that the commit
1906 has a valid signature.
1908 Raises:
1909 gpg.errors.BadSignatures: if GPG signature verification fails
1910 gpg.errors.MissingSignatures: if commit was not signed by a key
1911 specified in keyids
1912 """
1913 if self._gpgsig is None:
1914 return
1916 import gpg
1918 with gpg.Context() as ctx:
1919 data, result = ctx.verify(
1920 self.raw_without_sig(),
1921 signature=self._gpgsig,
1922 )
1923 if keyids:
1924 keys = [ctx.get_key(key) for key in keyids]
1925 for key in keys:
1926 for subkey in key.subkeys:
1927 for sig in result.signatures:
1928 if subkey.can_sign and subkey.fpr == sig.fpr:
1929 return
1930 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1932 def _serialize(self) -> list[bytes]:
1933 headers = []
1934 assert self._tree is not None
1935 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
1936 headers.append((_TREE_HEADER, tree_bytes))
1937 for p in self._parents:
1938 headers.append((_PARENT_HEADER, p))
1939 assert self._author is not None
1940 assert self._author_time is not None
1941 assert self._author_timezone is not None
1942 assert self._author_timezone_neg_utc is not None
1943 headers.append(
1944 (
1945 _AUTHOR_HEADER,
1946 format_time_entry(
1947 self._author,
1948 self._author_time,
1949 (self._author_timezone, self._author_timezone_neg_utc),
1950 ),
1951 )
1952 )
1953 assert self._committer is not None
1954 assert self._commit_time is not None
1955 assert self._commit_timezone is not None
1956 assert self._commit_timezone_neg_utc is not None
1957 headers.append(
1958 (
1959 _COMMITTER_HEADER,
1960 format_time_entry(
1961 self._committer,
1962 self._commit_time,
1963 (self._commit_timezone, self._commit_timezone_neg_utc),
1964 ),
1965 )
1966 )
1967 if self.encoding:
1968 headers.append((_ENCODING_HEADER, self.encoding))
1969 for mergetag in self.mergetag:
1970 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
1971 headers.extend(
1972 (field, value) for field, value in self._extra if value is not None
1973 )
1974 if self.gpgsig:
1975 headers.append((_GPGSIG_HEADER, self.gpgsig))
1976 return list(_format_message(headers, self._message))
1978 tree = serializable_property("tree", "Tree that is the state of this commit")
1980 def _get_parents(self) -> list[bytes]:
1981 """Return a list of parents of this commit."""
1982 return self._parents
1984 def _set_parents(self, value: list[bytes]) -> None:
1985 """Set a list of parents of this commit."""
1986 self._needs_serialization = True
1987 self._parents = value
1989 parents = property(
1990 _get_parents,
1991 _set_parents,
1992 doc="Parents of this commit, by their SHA1.",
1993 )
1995 @replace_me(since="0.21.0", remove_in="0.24.0")
1996 def _get_extra(self) -> list[tuple[bytes, bytes | None]]:
1997 """Return extra settings of this commit."""
1998 return self._extra
2000 extra = property(
2001 _get_extra,
2002 doc="Extra header fields not understood (presumably added in a "
2003 "newer version of git). Kept verbatim so the object can "
2004 "be correctly reserialized. For private commit metadata, use "
2005 "pseudo-headers in Commit.message, rather than this field.",
2006 )
2008 author = serializable_property("author", "The name of the author of the commit")
2010 committer = serializable_property(
2011 "committer", "The name of the committer of the commit"
2012 )
2014 message = serializable_property("message", "The commit message")
2016 commit_time = serializable_property(
2017 "commit_time",
2018 "The timestamp of the commit. As the number of seconds since the epoch.",
2019 )
2021 commit_timezone = serializable_property(
2022 "commit_timezone", "The zone the commit time is in"
2023 )
2025 author_time = serializable_property(
2026 "author_time",
2027 "The timestamp the commit was written. As the number of "
2028 "seconds since the epoch.",
2029 )
2031 author_timezone = serializable_property(
2032 "author_timezone", "Returns the zone the author time is in."
2033 )
2035 encoding = serializable_property("encoding", "Encoding of the commit message.")
2037 mergetag = serializable_property("mergetag", "Associated signed tag.")
2039 gpgsig = serializable_property("gpgsig", "GPG Signature.")
2042OBJECT_CLASSES = (
2043 Commit,
2044 Tree,
2045 Blob,
2046 Tag,
2047)
2049_TYPE_MAP: dict[bytes | int, type[ShaFile]] = {}
2051for cls in OBJECT_CLASSES:
2052 _TYPE_MAP[cls.type_name] = cls
2053 _TYPE_MAP[cls.type_num] = cls
2056# Hold on to the pure-python implementations for testing
2057_parse_tree_py = parse_tree
2058_sorted_tree_items_py = sorted_tree_items
2059try:
2060 # Try to import Rust versions
2061 from dulwich._objects import (
2062 parse_tree as _parse_tree_rs,
2063 )
2064 from dulwich._objects import (
2065 sorted_tree_items as _sorted_tree_items_rs,
2066 )
2067except ImportError:
2068 pass
2069else:
2070 parse_tree = _parse_tree_rs
2071 sorted_tree_items = _sorted_tree_items_rs