Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# objects.py -- Access to base git objects
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Access to base git objects."""
25import binascii
26import os
27import posixpath
28import stat
29import sys
30import zlib
31from collections.abc import Callable, Iterable, Iterator, Sequence
32from hashlib import sha1
33from io import BufferedIOBase, BytesIO
34from typing import (
35 IO,
36 TYPE_CHECKING,
37 NamedTuple,
38 TypeVar,
39)
41if sys.version_info >= (3, 11):
42 from typing import Self
43else:
44 from typing_extensions import Self
46from typing import TypeGuard
48from . import replace_me
49from .errors import (
50 ChecksumMismatch,
51 FileFormatException,
52 NotBlobError,
53 NotCommitError,
54 NotTagError,
55 NotTreeError,
56 ObjectFormatException,
57)
58from .file import GitFile
60if TYPE_CHECKING:
61 from _hashlib import HASH
63 from .file import _GitFile
65ZERO_SHA = b"0" * 40
67# Header fields for commits
68_TREE_HEADER = b"tree"
69_PARENT_HEADER = b"parent"
70_AUTHOR_HEADER = b"author"
71_COMMITTER_HEADER = b"committer"
72_ENCODING_HEADER = b"encoding"
73_MERGETAG_HEADER = b"mergetag"
74_GPGSIG_HEADER = b"gpgsig"
76# Header fields for objects
77_OBJECT_HEADER = b"object"
78_TYPE_HEADER = b"type"
79_TAG_HEADER = b"tag"
80_TAGGER_HEADER = b"tagger"
83S_IFGITLINK = 0o160000
86MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
88BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
89BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----"
91# Signature type constants
92SIGNATURE_PGP = b"pgp"
93SIGNATURE_SSH = b"ssh"
96ObjectID = bytes
99class EmptyFileException(FileFormatException):
100 """An unexpectedly empty file was encountered."""
103def S_ISGITLINK(m: int) -> bool:
104 """Check if a mode indicates a submodule.
106 Args:
107 m: Mode to check
108 Returns: a ``boolean``
109 """
110 return stat.S_IFMT(m) == S_IFGITLINK
113def _decompress(string: bytes) -> bytes:
114 dcomp = zlib.decompressobj()
115 dcomped = dcomp.decompress(string)
116 dcomped += dcomp.flush()
117 return dcomped
120def sha_to_hex(sha: ObjectID) -> bytes:
121 """Takes a string and returns the hex of the sha within."""
122 hexsha = binascii.hexlify(sha)
123 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}"
124 return hexsha
127def hex_to_sha(hex: bytes | str) -> bytes:
128 """Takes a hex sha and returns a binary sha."""
129 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}"
130 try:
131 return binascii.unhexlify(hex)
132 except TypeError as exc:
133 if not isinstance(hex, bytes):
134 raise
135 raise ValueError(exc.args[0]) from exc
138def valid_hexsha(hex: bytes | str) -> bool:
139 """Check if a string is a valid hex SHA.
141 Args:
142 hex: Hex string to check
144 Returns:
145 True if valid hex SHA, False otherwise
146 """
147 if len(hex) != 40:
148 return False
149 try:
150 binascii.unhexlify(hex)
151 except (TypeError, binascii.Error):
152 return False
153 else:
154 return True
157PathT = TypeVar("PathT", str, bytes)
160def hex_to_filename(path: PathT, hex: str | bytes) -> PathT:
161 """Takes a hex sha and returns its filename relative to the given path."""
162 # os.path.join accepts bytes or unicode, but all args must be of the same
163 # type. Make sure that hex which is expected to be bytes, is the same type
164 # as path.
165 if isinstance(path, str):
166 if isinstance(hex, bytes):
167 hex_str = hex.decode("ascii")
168 else:
169 hex_str = hex
170 dir_name = hex_str[:2]
171 file_name = hex_str[2:]
172 result = os.path.join(path, dir_name, file_name)
173 assert isinstance(result, str)
174 return result
175 else:
176 # path is bytes
177 if isinstance(hex, str):
178 hex_bytes = hex.encode("ascii")
179 else:
180 hex_bytes = hex
181 dir_name_b = hex_bytes[:2]
182 file_name_b = hex_bytes[2:]
183 result_b = os.path.join(path, dir_name_b, file_name_b)
184 assert isinstance(result_b, bytes)
185 return result_b
188def filename_to_hex(filename: str | bytes) -> str:
189 """Takes an object filename and returns its corresponding hex sha."""
190 # grab the last (up to) two path components
191 errmsg = f"Invalid object filename: {filename!r}"
192 if isinstance(filename, str):
193 names = filename.rsplit(os.path.sep, 2)[-2:]
194 assert len(names) == 2, errmsg
195 base, rest = names
196 assert len(base) == 2 and len(rest) == 38, errmsg
197 hex_str = base + rest
198 hex_bytes = hex_str.encode("ascii")
199 else:
200 # filename is bytes
201 sep = (
202 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep
203 )
204 names_b = filename.rsplit(sep, 2)[-2:]
205 assert len(names_b) == 2, errmsg
206 base_b, rest_b = names_b
207 assert len(base_b) == 2 and len(rest_b) == 38, errmsg
208 hex_bytes = base_b + rest_b
209 hex_to_sha(hex_bytes)
210 return hex_bytes.decode("ascii")
213def object_header(num_type: int, length: int) -> bytes:
214 """Return an object header for the given numeric type and text length."""
215 cls = object_class(num_type)
216 if cls is None:
217 raise AssertionError(f"unsupported class type num: {num_type}")
218 return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
221def serializable_property(name: str, docstring: str | None = None) -> property:
222 """A property that helps tracking whether serialization is necessary."""
224 def set(obj: "ShaFile", value: object) -> None:
225 """Set the property value and mark the object as needing serialization.
227 Args:
228 obj: The ShaFile object
229 value: The value to set
230 """
231 setattr(obj, "_" + name, value)
232 obj._needs_serialization = True
234 def get(obj: "ShaFile") -> object:
235 """Get the property value.
237 Args:
238 obj: The ShaFile object
240 Returns:
241 The property value
242 """
243 return getattr(obj, "_" + name)
245 return property(get, set, doc=docstring)
248def object_class(type: bytes | int) -> type["ShaFile"] | None:
249 """Get the object class corresponding to the given type.
251 Args:
252 type: Either a type name string or a numeric type.
253 Returns: The ShaFile subclass corresponding to the given type, or None if
254 type is not a valid type name/number.
255 """
256 return _TYPE_MAP.get(type, None)
259def check_hexsha(hex: str | bytes, error_msg: str) -> None:
260 """Check if a string is a valid hex sha string.
262 Args:
263 hex: Hex string to check
264 error_msg: Error message to use in exception
265 Raises:
266 ObjectFormatException: Raised when the string is not valid
267 """
268 if not valid_hexsha(hex):
269 raise ObjectFormatException(f"{error_msg} {hex!r}")
272def check_identity(identity: bytes | None, error_msg: str) -> None:
273 """Check if the specified identity is valid.
275 This will raise an exception if the identity is not valid.
277 Args:
278 identity: Identity string
279 error_msg: Error message to use in exception
280 """
281 if identity is None:
282 raise ObjectFormatException(error_msg)
283 email_start = identity.find(b"<")
284 email_end = identity.find(b">")
285 if not all(
286 [
287 email_start >= 1,
288 identity[email_start - 1] == b" "[0],
289 identity.find(b"<", email_start + 1) == -1,
290 email_end == len(identity) - 1,
291 b"\0" not in identity,
292 b"\n" not in identity,
293 ]
294 ):
295 raise ObjectFormatException(error_msg)
298def _path_to_bytes(path: str | bytes) -> bytes:
299 """Convert a path to bytes for use in error messages."""
300 if isinstance(path, str):
301 return path.encode("utf-8", "surrogateescape")
302 return path
305def check_time(time_seconds: int) -> None:
306 """Check if the specified time is not prone to overflow error.
308 This will raise an exception if the time is not valid.
310 Args:
311 time_seconds: time in seconds
313 """
314 # Prevent overflow error
315 if time_seconds > MAX_TIME:
316 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
319def git_line(*items: bytes) -> bytes:
320 """Formats items into a space separated line."""
321 return b" ".join(items) + b"\n"
324class FixedSha:
325 """SHA object that behaves like hashlib's but is given a fixed value."""
327 __slots__ = ("_hexsha", "_sha")
329 def __init__(self, hexsha: str | bytes) -> None:
330 """Initialize FixedSha with a fixed SHA value.
332 Args:
333 hexsha: Hex SHA value as string or bytes
334 """
335 if isinstance(hexsha, str):
336 hexsha = hexsha.encode("ascii")
337 if not isinstance(hexsha, bytes):
338 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
339 self._hexsha = hexsha
340 self._sha = hex_to_sha(hexsha)
342 def digest(self) -> bytes:
343 """Return the raw SHA digest."""
344 return self._sha
346 def hexdigest(self) -> str:
347 """Return the hex SHA digest."""
348 return self._hexsha.decode("ascii")
351# Type guard functions for runtime type narrowing
352if TYPE_CHECKING:
354 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
355 """Check if a ShaFile is a Commit."""
356 return obj.type_name == b"commit"
358 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
359 """Check if a ShaFile is a Tree."""
360 return obj.type_name == b"tree"
362 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
363 """Check if a ShaFile is a Blob."""
364 return obj.type_name == b"blob"
366 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
367 """Check if a ShaFile is a Tag."""
368 return obj.type_name == b"tag"
369else:
370 # Runtime versions without type narrowing
371 def is_commit(obj: "ShaFile") -> bool:
372 """Check if a ShaFile is a Commit."""
373 return obj.type_name == b"commit"
375 def is_tree(obj: "ShaFile") -> bool:
376 """Check if a ShaFile is a Tree."""
377 return obj.type_name == b"tree"
379 def is_blob(obj: "ShaFile") -> bool:
380 """Check if a ShaFile is a Blob."""
381 return obj.type_name == b"blob"
383 def is_tag(obj: "ShaFile") -> bool:
384 """Check if a ShaFile is a Tag."""
385 return obj.type_name == b"tag"
388class ShaFile:
389 """A git SHA file."""
391 __slots__ = ("_chunked_text", "_needs_serialization", "_sha")
393 _needs_serialization: bool
394 type_name: bytes
395 type_num: int
396 _chunked_text: list[bytes] | None
397 _sha: "FixedSha | None | HASH"
399 @staticmethod
400 def _parse_legacy_object_header(
401 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
402 ) -> "ShaFile":
403 """Parse a legacy object, creating it but not reading the file."""
404 bufsize = 1024
405 decomp = zlib.decompressobj()
406 header = decomp.decompress(magic)
407 start = 0
408 end = -1
409 while end < 0:
410 extra = f.read(bufsize)
411 header += decomp.decompress(extra)
412 magic += extra
413 end = header.find(b"\0", start)
414 start = len(header)
415 header = header[:end]
416 type_name, size = header.split(b" ", 1)
417 try:
418 int(size) # sanity check
419 except ValueError as exc:
420 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc
421 obj_class = object_class(type_name)
422 if not obj_class:
423 raise ObjectFormatException(
424 "Not a known type: {}".format(type_name.decode("ascii"))
425 )
426 return obj_class()
428 def _parse_legacy_object(self, map: bytes) -> None:
429 """Parse a legacy object, setting the raw string."""
430 text = _decompress(map)
431 header_end = text.find(b"\0")
432 if header_end < 0:
433 raise ObjectFormatException("Invalid object header, no \\0")
434 self.set_raw_string(text[header_end + 1 :])
436 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
437 """Return chunks representing the object in the experimental format.
439 Returns: List of strings
440 """
441 compobj = zlib.compressobj(compression_level)
442 yield compobj.compress(self._header())
443 for chunk in self.as_raw_chunks():
444 yield compobj.compress(chunk)
445 yield compobj.flush()
447 def as_legacy_object(self, compression_level: int = -1) -> bytes:
448 """Return string representing the object in the experimental format."""
449 return b"".join(
450 self.as_legacy_object_chunks(compression_level=compression_level)
451 )
453 def as_raw_chunks(self) -> list[bytes]:
454 """Return chunks with serialization of the object.
456 Returns: List of strings, not necessarily one per line
457 """
458 if self._needs_serialization:
459 self._sha = None
460 self._chunked_text = self._serialize()
461 self._needs_serialization = False
462 assert self._chunked_text is not None
463 return self._chunked_text
465 def as_raw_string(self) -> bytes:
466 """Return raw string with serialization of the object.
468 Returns: String object
469 """
470 return b"".join(self.as_raw_chunks())
472 def __bytes__(self) -> bytes:
473 """Return raw string serialization of this object."""
474 return self.as_raw_string()
476 def __hash__(self) -> int:
477 """Return unique hash for this object."""
478 return hash(self.id)
480 def as_pretty_string(self) -> str:
481 """Return a string representing this object, fit for display."""
482 return self.as_raw_string().decode("utf-8", "replace")
484 def set_raw_string(self, text: bytes, sha: ObjectID | None = None) -> None:
485 """Set the contents of this object from a serialized string."""
486 if not isinstance(text, bytes):
487 raise TypeError(f"Expected bytes for text, got {text!r}")
488 self.set_raw_chunks([text], sha)
490 def set_raw_chunks(self, chunks: list[bytes], sha: ObjectID | None = None) -> None:
491 """Set the contents of this object from a list of chunks."""
492 self._chunked_text = chunks
493 self._deserialize(chunks)
494 if sha is None:
495 self._sha = None
496 else:
497 self._sha = FixedSha(sha)
498 self._needs_serialization = False
500 @staticmethod
501 def _parse_object_header(
502 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
503 ) -> "ShaFile":
504 """Parse a new style object, creating it but not reading the file."""
505 num_type = (ord(magic[0:1]) >> 4) & 7
506 obj_class = object_class(num_type)
507 if not obj_class:
508 raise ObjectFormatException(f"Not a known type {num_type}")
509 return obj_class()
511 def _parse_object(self, map: bytes) -> None:
512 """Parse a new style object, setting self._text."""
513 # skip type and size; type must have already been determined, and
514 # we trust zlib to fail if it's otherwise corrupted
515 byte = ord(map[0:1])
516 used = 1
517 while (byte & 0x80) != 0:
518 byte = ord(map[used : used + 1])
519 used += 1
520 raw = map[used:]
521 self.set_raw_string(_decompress(raw))
523 @classmethod
524 def _is_legacy_object(cls, magic: bytes) -> bool:
525 b0 = ord(magic[0:1])
526 b1 = ord(magic[1:2])
527 word = (b0 << 8) + b1
528 return (b0 & 0x8F) == 0x08 and (word % 31) == 0
530 @classmethod
531 def _parse_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile":
532 map = f.read()
533 if not map:
534 raise EmptyFileException("Corrupted empty file detected")
536 if cls._is_legacy_object(map):
537 obj = cls._parse_legacy_object_header(map, f)
538 obj._parse_legacy_object(map)
539 else:
540 obj = cls._parse_object_header(map, f)
541 obj._parse_object(map)
542 return obj
544 def __init__(self) -> None:
545 """Don't call this directly."""
546 self._sha = None
547 self._chunked_text = []
548 self._needs_serialization = True
550 def _deserialize(self, chunks: list[bytes]) -> None:
551 raise NotImplementedError(self._deserialize)
553 def _serialize(self) -> list[bytes]:
554 raise NotImplementedError(self._serialize)
556 @classmethod
557 def from_path(cls, path: str | bytes) -> "ShaFile":
558 """Open a SHA file from disk."""
559 with GitFile(path, "rb") as f:
560 return cls.from_file(f)
562 @classmethod
563 def from_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile":
564 """Get the contents of a SHA file on disk."""
565 try:
566 obj = cls._parse_file(f)
567 obj._sha = None
568 return obj
569 except (IndexError, ValueError) as exc:
570 raise ObjectFormatException("invalid object header") from exc
572 @staticmethod
573 def from_raw_string(
574 type_num: int, string: bytes, sha: ObjectID | None = None
575 ) -> "ShaFile":
576 """Creates an object of the indicated type from the raw string given.
578 Args:
579 type_num: The numeric type of the object.
580 string: The raw uncompressed contents.
581 sha: Optional known sha for the object
582 """
583 cls = object_class(type_num)
584 if cls is None:
585 raise AssertionError(f"unsupported class type num: {type_num}")
586 obj = cls()
587 obj.set_raw_string(string, sha)
588 return obj
590 @staticmethod
591 def from_raw_chunks(
592 type_num: int, chunks: list[bytes], sha: ObjectID | None = None
593 ) -> "ShaFile":
594 """Creates an object of the indicated type from the raw chunks given.
596 Args:
597 type_num: The numeric type of the object.
598 chunks: An iterable of the raw uncompressed contents.
599 sha: Optional known sha for the object
600 """
601 cls = object_class(type_num)
602 if cls is None:
603 raise AssertionError(f"unsupported class type num: {type_num}")
604 obj = cls()
605 obj.set_raw_chunks(chunks, sha)
606 return obj
608 @classmethod
609 def from_string(cls, string: bytes) -> Self:
610 """Create a ShaFile from a string."""
611 obj = cls()
612 obj.set_raw_string(string)
613 return obj
615 def _check_has_member(self, member: str, error_msg: str) -> None:
616 """Check that the object has a given member variable.
618 Args:
619 member: the member variable to check for
620 error_msg: the message for an error if the member is missing
621 Raises:
622 ObjectFormatException: with the given error_msg if member is
623 missing or is None
624 """
625 if getattr(self, member, None) is None:
626 raise ObjectFormatException(error_msg)
628 def check(self) -> None:
629 """Check this object for internal consistency.
631 Raises:
632 ObjectFormatException: if the object is malformed in some way
633 ChecksumMismatch: if the object was created with a SHA that does
634 not match its contents
635 """
636 # TODO: if we find that error-checking during object parsing is a
637 # performance bottleneck, those checks should be moved to the class's
638 # check() method during optimization so we can still check the object
639 # when necessary.
640 old_sha = self.id
641 try:
642 self._deserialize(self.as_raw_chunks())
643 self._sha = None
644 new_sha = self.id
645 except Exception as exc:
646 raise ObjectFormatException(exc) from exc
647 if old_sha != new_sha:
648 raise ChecksumMismatch(new_sha, old_sha)
650 def _header(self) -> bytes:
651 return object_header(self.type_num, self.raw_length())
653 def raw_length(self) -> int:
654 """Returns the length of the raw string of this object."""
655 return sum(map(len, self.as_raw_chunks()))
657 def sha(self) -> "FixedSha | HASH":
658 """The SHA1 object that is the name of this object."""
659 if self._sha is None or self._needs_serialization:
660 # this is a local because as_raw_chunks() overwrites self._sha
661 new_sha = sha1()
662 new_sha.update(self._header())
663 for chunk in self.as_raw_chunks():
664 new_sha.update(chunk)
665 self._sha = new_sha
666 return self._sha
668 def copy(self) -> "ShaFile":
669 """Create a new copy of this SHA1 object from its raw string."""
670 obj_class = object_class(self.type_num)
671 if obj_class is None:
672 raise AssertionError(f"invalid type num {self.type_num}")
673 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
675 @property
676 def id(self) -> bytes:
677 """The hex SHA of this object."""
678 return self.sha().hexdigest().encode("ascii")
680 def __repr__(self) -> str:
681 """Return string representation of this object."""
682 return f"<{self.__class__.__name__} {self.id!r}>"
684 def __ne__(self, other: object) -> bool:
685 """Check whether this object does not match the other."""
686 return not isinstance(other, ShaFile) or self.id != other.id
688 def __eq__(self, other: object) -> bool:
689 """Return True if the SHAs of the two objects match."""
690 return isinstance(other, ShaFile) and self.id == other.id
692 def __lt__(self, other: object) -> bool:
693 """Return whether SHA of this object is less than the other."""
694 if not isinstance(other, ShaFile):
695 raise TypeError
696 return self.id < other.id
698 def __le__(self, other: object) -> bool:
699 """Check whether SHA of this object is less than or equal to the other."""
700 if not isinstance(other, ShaFile):
701 raise TypeError
702 return self.id <= other.id
705class Blob(ShaFile):
706 """A Git Blob object."""
708 __slots__ = ()
710 type_name = b"blob"
711 type_num = 3
713 _chunked_text: list[bytes]
715 def __init__(self) -> None:
716 """Initialize a new Blob object."""
717 super().__init__()
718 self._chunked_text = []
719 self._needs_serialization = False
721 def _get_data(self) -> bytes:
722 return self.as_raw_string()
724 def _set_data(self, data: bytes) -> None:
725 self.set_raw_string(data)
727 data = property(
728 _get_data, _set_data, doc="The text contained within the blob object."
729 )
731 def _get_chunked(self) -> list[bytes]:
732 return self._chunked_text
734 def _set_chunked(self, chunks: list[bytes]) -> None:
735 self._chunked_text = chunks
737 def _serialize(self) -> list[bytes]:
738 return self._chunked_text
740 def _deserialize(self, chunks: list[bytes]) -> None:
741 self._chunked_text = chunks
743 chunked = property(
744 _get_chunked,
745 _set_chunked,
746 doc="The text in the blob object, as chunks (not necessarily lines)",
747 )
749 @classmethod
750 def from_path(cls, path: str | bytes) -> "Blob":
751 """Read a blob from a file on disk.
753 Args:
754 path: Path to the blob file
756 Returns:
757 A Blob object
759 Raises:
760 NotBlobError: If the file is not a blob
761 """
762 blob = ShaFile.from_path(path)
763 if not isinstance(blob, cls):
764 raise NotBlobError(_path_to_bytes(path))
765 return blob
767 def check(self) -> None:
768 """Check this object for internal consistency.
770 Raises:
771 ObjectFormatException: if the object is malformed in some way
772 """
773 super().check()
775 def splitlines(self) -> list[bytes]:
776 """Return list of lines in this blob.
778 This preserves the original line endings.
779 """
780 chunks = self.chunked
781 if not chunks:
782 return []
783 if len(chunks) == 1:
784 result: list[bytes] = chunks[0].splitlines(True)
785 return result
786 remaining = None
787 ret = []
788 for chunk in chunks:
789 lines = chunk.splitlines(True)
790 if len(lines) > 1:
791 ret.append((remaining or b"") + lines[0])
792 ret.extend(lines[1:-1])
793 remaining = lines[-1]
794 elif len(lines) == 1:
795 if remaining is None:
796 remaining = lines.pop()
797 else:
798 remaining += lines.pop()
799 if remaining is not None:
800 ret.append(remaining)
801 return ret
804def _parse_message(
805 chunks: Iterable[bytes],
806) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]:
807 """Parse a message with a list of fields and a body.
809 Args:
810 chunks: the raw chunks of the tag or commit object.
811 Returns: iterator of tuples of (field, value), one per header line, in the
812 order read from the text, possibly including duplicates. Includes a
813 field named None for the freeform tag/commit text.
814 """
815 f = BytesIO(b"".join(chunks))
816 k = None
817 v = b""
818 eof = False
820 def _strip_last_newline(value: bytes) -> bytes:
821 """Strip the last newline from value."""
822 if value and value.endswith(b"\n"):
823 return value[:-1]
824 return value
826 # Parse the headers
827 #
828 # Headers can contain newlines. The next line is indented with a space.
829 # We store the latest key as 'k', and the accumulated value as 'v'.
830 for line in f:
831 if line.startswith(b" "):
832 # Indented continuation of the previous line
833 v += line[1:]
834 else:
835 if k is not None:
836 # We parsed a new header, return its value
837 yield (k, _strip_last_newline(v))
838 if line == b"\n":
839 # Empty line indicates end of headers
840 break
841 (k, v) = line.split(b" ", 1)
843 else:
844 # We reached end of file before the headers ended. We still need to
845 # return the previous header, then we need to return a None field for
846 # the text.
847 eof = True
848 if k is not None:
849 yield (k, _strip_last_newline(v))
850 yield (None, None)
852 if not eof:
853 # We didn't reach the end of file while parsing headers. We can return
854 # the rest of the file as a message.
855 yield (None, f.read())
857 f.close()
860def _format_message(
861 headers: Sequence[tuple[bytes, bytes]], body: bytes | None
862) -> Iterator[bytes]:
863 for field, value in headers:
864 lines = value.split(b"\n")
865 yield git_line(field, lines[0])
866 for line in lines[1:]:
867 yield b" " + line + b"\n"
868 yield b"\n" # There must be a new line after the headers
869 if body:
870 yield body
873class Tag(ShaFile):
874 """A Git Tag object."""
876 type_name = b"tag"
877 type_num = 4
879 __slots__ = (
880 "_message",
881 "_name",
882 "_object_class",
883 "_object_sha",
884 "_signature",
885 "_tag_time",
886 "_tag_timezone",
887 "_tag_timezone_neg_utc",
888 "_tagger",
889 )
891 _message: bytes | None
892 _name: bytes | None
893 _object_class: "type[ShaFile] | None"
894 _object_sha: bytes | None
895 _signature: bytes | None
896 _tag_time: int | None
897 _tag_timezone: int | None
898 _tag_timezone_neg_utc: bool | None
899 _tagger: bytes | None
901 def __init__(self) -> None:
902 """Initialize a new Tag object."""
903 super().__init__()
904 self._tagger = None
905 self._tag_time = None
906 self._tag_timezone = None
907 self._tag_timezone_neg_utc = False
908 self._signature: bytes | None = None
910 @classmethod
911 def from_path(cls, filename: str | bytes) -> "Tag":
912 """Read a tag from a file on disk.
914 Args:
915 filename: Path to the tag file
917 Returns:
918 A Tag object
920 Raises:
921 NotTagError: If the file is not a tag
922 """
923 tag = ShaFile.from_path(filename)
924 if not isinstance(tag, cls):
925 raise NotTagError(_path_to_bytes(filename))
926 return tag
928 def check(self) -> None:
929 """Check this object for internal consistency.
931 Raises:
932 ObjectFormatException: if the object is malformed in some way
933 """
934 super().check()
935 assert self._chunked_text is not None
936 self._check_has_member("_object_sha", "missing object sha")
937 self._check_has_member("_object_class", "missing object type")
938 self._check_has_member("_name", "missing tag name")
940 if not self._name:
941 raise ObjectFormatException("empty tag name")
943 if self._object_sha is None:
944 raise ObjectFormatException("missing object sha")
945 check_hexsha(self._object_sha, "invalid object sha")
947 if self._tagger is not None:
948 check_identity(self._tagger, "invalid tagger")
950 self._check_has_member("_tag_time", "missing tag time")
951 if self._tag_time is None:
952 raise ObjectFormatException("missing tag time")
953 check_time(self._tag_time)
955 last = None
956 for field, _ in _parse_message(self._chunked_text):
957 if field == _OBJECT_HEADER and last is not None:
958 raise ObjectFormatException("unexpected object")
959 elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
960 raise ObjectFormatException("unexpected type")
961 elif field == _TAG_HEADER and last != _TYPE_HEADER:
962 raise ObjectFormatException("unexpected tag name")
963 elif field == _TAGGER_HEADER and last != _TAG_HEADER:
964 raise ObjectFormatException("unexpected tagger")
965 last = field
967 def _serialize(self) -> list[bytes]:
968 headers = []
969 if self._object_sha is None:
970 raise ObjectFormatException("missing object sha")
971 headers.append((_OBJECT_HEADER, self._object_sha))
972 if self._object_class is None:
973 raise ObjectFormatException("missing object class")
974 headers.append((_TYPE_HEADER, self._object_class.type_name))
975 if self._name is None:
976 raise ObjectFormatException("missing tag name")
977 headers.append((_TAG_HEADER, self._name))
978 if self._tagger:
979 if self._tag_time is None:
980 headers.append((_TAGGER_HEADER, self._tagger))
981 else:
982 if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
983 raise ObjectFormatException("missing timezone info")
984 headers.append(
985 (
986 _TAGGER_HEADER,
987 format_time_entry(
988 self._tagger,
989 self._tag_time,
990 (self._tag_timezone, self._tag_timezone_neg_utc),
991 ),
992 )
993 )
995 if self.message is None and self._signature is None:
996 body = None
997 else:
998 body = (self.message or b"") + (self._signature or b"")
999 return list(_format_message(headers, body))
1001 def _deserialize(self, chunks: list[bytes]) -> None:
1002 """Grab the metadata attached to the tag."""
1003 self._tagger = None
1004 self._tag_time = None
1005 self._tag_timezone = None
1006 self._tag_timezone_neg_utc = False
1007 for field, value in _parse_message(chunks):
1008 if field == _OBJECT_HEADER:
1009 self._object_sha = value
1010 elif field == _TYPE_HEADER:
1011 assert isinstance(value, bytes)
1012 obj_class = object_class(value)
1013 if not obj_class:
1014 raise ObjectFormatException(f"Not a known type: {value!r}")
1015 self._object_class = obj_class
1016 elif field == _TAG_HEADER:
1017 self._name = value
1018 elif field == _TAGGER_HEADER:
1019 if value is None:
1020 raise ObjectFormatException("missing tagger value")
1021 (
1022 self._tagger,
1023 self._tag_time,
1024 (self._tag_timezone, self._tag_timezone_neg_utc),
1025 ) = parse_time_entry(value)
1026 elif field is None:
1027 if value is None:
1028 self._message = None
1029 self._signature = None
1030 else:
1031 # Try to find either PGP or SSH signature
1032 sig_idx = None
1033 try:
1034 sig_idx = value.index(BEGIN_PGP_SIGNATURE)
1035 except ValueError:
1036 try:
1037 sig_idx = value.index(BEGIN_SSH_SIGNATURE)
1038 except ValueError:
1039 pass
1041 if sig_idx is not None:
1042 self._message = value[:sig_idx]
1043 self._signature = value[sig_idx:]
1044 else:
1045 self._message = value
1046 self._signature = None
1047 else:
1048 raise ObjectFormatException(
1049 f"Unknown field {field.decode('ascii', 'replace')}"
1050 )
1052 def _get_object(self) -> tuple[type[ShaFile], bytes]:
1053 """Get the object pointed to by this tag.
1055 Returns: tuple of (object class, sha).
1056 """
1057 if self._object_class is None or self._object_sha is None:
1058 raise ValueError("Tag object is not properly initialized")
1059 return (self._object_class, self._object_sha)
1061 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
1062 (self._object_class, self._object_sha) = value
1063 self._needs_serialization = True
1065 object = property(_get_object, _set_object)
1067 name = serializable_property("name", "The name of this tag")
1068 tagger = serializable_property(
1069 "tagger", "Returns the name of the person who created this tag"
1070 )
1071 tag_time = serializable_property(
1072 "tag_time",
1073 "The creation timestamp of the tag. As the number of seconds since the epoch",
1074 )
1075 tag_timezone = serializable_property(
1076 "tag_timezone", "The timezone that tag_time is in."
1077 )
1078 message = serializable_property("message", "the message attached to this tag")
1080 signature = serializable_property("signature", "Optional detached GPG signature")
1082 def sign(self, keyid: str | None = None) -> None:
1083 """Sign this tag with a GPG key.
1085 Args:
1086 keyid: Optional GPG key ID to use for signing. If not specified,
1087 the default GPG key will be used.
1088 """
1089 import gpg
1091 with gpg.Context(armor=True) as c:
1092 if keyid is not None:
1093 key = c.get_key(keyid)
1094 with gpg.Context(armor=True, signers=[key]) as ctx:
1095 self.signature, _unused_result = ctx.sign(
1096 self.as_raw_string(),
1097 mode=gpg.constants.sig.mode.DETACH,
1098 )
1099 else:
1100 self.signature, _unused_result = c.sign(
1101 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1102 )
1104 def raw_without_sig(self) -> bytes:
1105 """Return raw string serialization without the GPG/SSH signature.
1107 self.signature is a signature for the returned raw byte string serialization.
1108 """
1109 ret = self.as_raw_string()
1110 if self._signature:
1111 ret = ret[: -len(self._signature)]
1112 return ret
1114 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]:
1115 """Extract the payload, signature, and signature type from this tag.
1117 Returns:
1118 Tuple of (``payload``, ``signature``, ``signature_type``) where:
1120 - ``payload``: The raw tag data without the signature
1121 - ``signature``: The signature bytes if present, None otherwise
1122 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
1124 Raises:
1125 ObjectFormatException: If signature has unknown format
1126 """
1127 if self._signature is None:
1128 return self.as_raw_string(), None, None
1130 payload = self.raw_without_sig()
1132 # Determine signature type
1133 if self._signature.startswith(BEGIN_PGP_SIGNATURE):
1134 sig_type = SIGNATURE_PGP
1135 elif self._signature.startswith(BEGIN_SSH_SIGNATURE):
1136 sig_type = SIGNATURE_SSH
1137 else:
1138 raise ObjectFormatException("Unknown signature format")
1140 return payload, self._signature, sig_type
1142 def verify(self, keyids: Iterable[str] | None = None) -> None:
1143 """Verify GPG signature for this tag (if it is signed).
1145 Args:
1146 keyids: Optional iterable of trusted keyids for this tag.
1147 If this tag is not signed by any key in keyids verification will
1148 fail. If not specified, this function only verifies that the tag
1149 has a valid signature.
1151 Raises:
1152 gpg.errors.BadSignatures: if GPG signature verification fails
1153 gpg.errors.MissingSignatures: if tag was not signed by a key
1154 specified in keyids
1155 """
1156 if self._signature is None:
1157 return
1159 import gpg
1161 with gpg.Context() as ctx:
1162 data, result = ctx.verify(
1163 self.raw_without_sig(),
1164 signature=self._signature,
1165 )
1166 if keyids:
1167 keys = [ctx.get_key(key) for key in keyids]
1168 for key in keys:
1169 for subkey in key.subkeys:
1170 for sig in result.signatures:
1171 if subkey.can_sign and subkey.fpr == sig.fpr:
1172 return
1173 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1176class TreeEntry(NamedTuple):
1177 """Named tuple encapsulating a single tree entry."""
1179 path: bytes
1180 mode: int
1181 sha: bytes
1183 def in_path(self, path: bytes) -> "TreeEntry":
1184 """Return a copy of this entry with the given path prepended."""
1185 if not isinstance(self.path, bytes):
1186 raise TypeError(f"Expected bytes for path, got {path!r}")
1187 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
1190def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]:
1191 """Parse a tree text.
1193 Args:
1194 text: Serialized text to parse
1195 strict: If True, enforce strict validation
1196 Returns: iterator of tuples of (name, mode, sha)
1198 Raises:
1199 ObjectFormatException: if the object was malformed in some way
1200 """
1201 count = 0
1202 length = len(text)
1203 while count < length:
1204 mode_end = text.index(b" ", count)
1205 mode_text = text[count:mode_end]
1206 if strict and mode_text.startswith(b"0"):
1207 raise ObjectFormatException(f"Invalid mode {mode_text!r}")
1208 try:
1209 mode = int(mode_text, 8)
1210 except ValueError as exc:
1211 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
1212 name_end = text.index(b"\0", mode_end)
1213 name = text[mode_end + 1 : name_end]
1214 count = name_end + 21
1215 sha = text[name_end + 1 : count]
1216 if len(sha) != 20:
1217 raise ObjectFormatException("Sha has invalid length")
1218 hexsha = sha_to_hex(sha)
1219 yield (name, mode, hexsha)
1222def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]:
1223 """Serialize the items in a tree to a text.
1225 Args:
1226 items: Sorted iterable over (name, mode, sha) tuples
1227 Returns: Serialized tree text as chunks
1228 """
1229 for name, mode, hexsha in items:
1230 yield (
1231 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha)
1232 )
1235def sorted_tree_items(
1236 entries: dict[bytes, tuple[int, bytes]], name_order: bool
1237) -> Iterator[TreeEntry]:
1238 """Iterate over a tree entries dictionary.
1240 Args:
1241 name_order: If True, iterate entries in order of their name. If
1242 False, iterate entries in tree order, that is, treat subtree entries as
1243 having '/' appended.
1244 entries: Dictionary mapping names to (mode, sha) tuples
1245 Returns: Iterator over (name, mode, hexsha)
1246 """
1247 if name_order:
1248 key_func = key_entry_name_order
1249 else:
1250 key_func = key_entry
1251 for name, entry in sorted(entries.items(), key=key_func):
1252 mode, hexsha = entry
1253 # Stricter type checks than normal to mirror checks in the Rust version.
1254 mode = int(mode)
1255 if not isinstance(hexsha, bytes):
1256 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}")
1257 yield TreeEntry(name, mode, hexsha)
1260def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1261 """Sort key for tree entry.
1263 Args:
1264 entry: (name, value) tuple
1265 """
1266 (name, (mode, _sha)) = entry
1267 if stat.S_ISDIR(mode):
1268 name += b"/"
1269 return name
1272def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1273 """Sort key for tree entry in name order."""
1274 return entry[0]
1277def pretty_format_tree_entry(
1278 name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8"
1279) -> str:
1280 """Pretty format tree entry.
1282 Args:
1283 name: Name of the directory entry
1284 mode: Mode of entry
1285 hexsha: Hexsha of the referenced object
1286 encoding: Character encoding for the name
1287 Returns: string describing the tree entry
1288 """
1289 if mode & stat.S_IFDIR:
1290 kind = "tree"
1291 else:
1292 kind = "blob"
1293 return "{:04o} {} {}\t{}\n".format(
1294 mode,
1295 kind,
1296 hexsha.decode("ascii"),
1297 name.decode(encoding, "replace"),
1298 )
1301class SubmoduleEncountered(Exception):
1302 """A submodule was encountered while resolving a path."""
1304 def __init__(self, path: bytes, sha: ObjectID) -> None:
1305 """Initialize SubmoduleEncountered exception.
1307 Args:
1308 path: Path where the submodule was encountered
1309 sha: SHA of the submodule
1310 """
1311 self.path = path
1312 self.sha = sha
1315class Tree(ShaFile):
1316 """A Git tree object."""
1318 type_name = b"tree"
1319 type_num = 2
1321 __slots__ = "_entries"
1323 def __init__(self) -> None:
1324 """Initialize an empty Tree."""
1325 super().__init__()
1326 self._entries: dict[bytes, tuple[int, bytes]] = {}
1328 @classmethod
1329 def from_path(cls, filename: str | bytes) -> "Tree":
1330 """Read a tree from a file on disk.
1332 Args:
1333 filename: Path to the tree file
1335 Returns:
1336 A Tree object
1338 Raises:
1339 NotTreeError: If the file is not a tree
1340 """
1341 tree = ShaFile.from_path(filename)
1342 if not isinstance(tree, cls):
1343 raise NotTreeError(_path_to_bytes(filename))
1344 return tree
1346 def __contains__(self, name: bytes) -> bool:
1347 """Check if name exists in tree."""
1348 return name in self._entries
1350 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
1351 """Get tree entry by name."""
1352 return self._entries[name]
1354 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
1355 """Set a tree entry by name.
1357 Args:
1358 name: The name of the entry, as a string.
1359 value: A tuple of (mode, hexsha), where mode is the mode of the
1360 entry as an integral type and hexsha is the hex SHA of the entry as
1361 a string.
1362 """
1363 mode, hexsha = value
1364 self._entries[name] = (mode, hexsha)
1365 self._needs_serialization = True
1367 def __delitem__(self, name: bytes) -> None:
1368 """Delete tree entry by name."""
1369 del self._entries[name]
1370 self._needs_serialization = True
1372 def __len__(self) -> int:
1373 """Return number of entries in tree."""
1374 return len(self._entries)
1376 def __iter__(self) -> Iterator[bytes]:
1377 """Iterate over tree entry names."""
1378 return iter(self._entries)
1380 def add(self, name: bytes, mode: int, hexsha: bytes) -> None:
1381 """Add an entry to the tree.
1383 Args:
1384 mode: The mode of the entry as an integral type. Not all
1385 possible modes are supported by git; see check() for details.
1386 name: The name of the entry, as a string.
1387 hexsha: The hex SHA of the entry as a string.
1388 """
1389 self._entries[name] = mode, hexsha
1390 self._needs_serialization = True
1392 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
1393 """Iterate over entries.
1395 Args:
1396 name_order: If True, iterate in name order instead of tree
1397 order.
1398 Returns: Iterator over (name, mode, sha) tuples
1399 """
1400 return sorted_tree_items(self._entries, name_order)
1402 def items(self) -> list[TreeEntry]:
1403 """Return the sorted entries in this tree.
1405 Returns: List with (name, mode, sha) tuples
1406 """
1407 return list(self.iteritems())
1409 def _deserialize(self, chunks: list[bytes]) -> None:
1410 """Grab the entries in the tree."""
1411 try:
1412 parsed_entries = parse_tree(b"".join(chunks))
1413 except ValueError as exc:
1414 raise ObjectFormatException(exc) from exc
1415 # TODO: list comprehension is for efficiency in the common (small)
1416 # case; if memory efficiency in the large case is a concern, use a
1417 # genexp.
1418 self._entries = {n: (m, s) for n, m, s in parsed_entries}
1420 def check(self) -> None:
1421 """Check this object for internal consistency.
1423 Raises:
1424 ObjectFormatException: if the object is malformed in some way
1425 """
1426 super().check()
1427 assert self._chunked_text is not None
1428 last = None
1429 allowed_modes = (
1430 stat.S_IFREG | 0o755,
1431 stat.S_IFREG | 0o644,
1432 stat.S_IFLNK,
1433 stat.S_IFDIR,
1434 S_IFGITLINK,
1435 # TODO: optionally exclude as in git fsck --strict
1436 stat.S_IFREG | 0o664,
1437 )
1438 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True):
1439 check_hexsha(sha, f"invalid sha {sha!r}")
1440 if b"/" in name or name in (b"", b".", b"..", b".git"):
1441 raise ObjectFormatException(
1442 "invalid name {}".format(name.decode("utf-8", "replace"))
1443 )
1445 if mode not in allowed_modes:
1446 raise ObjectFormatException(f"invalid mode {mode:06o}")
1448 entry = (name, (mode, sha))
1449 if last:
1450 if key_entry(last) > key_entry(entry):
1451 raise ObjectFormatException("entries not sorted")
1452 if name == last[0]:
1453 raise ObjectFormatException(f"duplicate entry {name!r}")
1454 last = entry
1456 def _serialize(self) -> list[bytes]:
1457 return list(serialize_tree(self.iteritems()))
1459 def as_pretty_string(self) -> str:
1460 """Return a human-readable string representation of this tree.
1462 Returns:
1463 Pretty-printed tree entries
1464 """
1465 text: list[str] = []
1466 for entry in self.iteritems():
1467 if (
1468 entry.path is not None
1469 and entry.mode is not None
1470 and entry.sha is not None
1471 ):
1472 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha))
1473 return "".join(text)
1475 def lookup_path(
1476 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
1477 ) -> tuple[int, ObjectID]:
1478 """Look up an object in a Git tree.
1480 Args:
1481 lookup_obj: Callback for retrieving object by SHA1
1482 path: Path to lookup
1483 Returns: A tuple of (mode, SHA) of the resulting path.
1484 """
1485 # Handle empty path - return the tree itself
1486 if not path:
1487 return stat.S_IFDIR, self.id
1489 parts = path.split(b"/")
1490 sha = self.id
1491 mode: int | None = None
1492 for i, p in enumerate(parts):
1493 if not p:
1494 continue
1495 if mode is not None and S_ISGITLINK(mode):
1496 raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
1497 obj = lookup_obj(sha)
1498 if not isinstance(obj, Tree):
1499 raise NotTreeError(sha)
1500 mode, sha = obj[p]
1501 if mode is None:
1502 raise ValueError("No valid path found")
1503 return mode, sha
1506def parse_timezone(text: bytes) -> tuple[int, bool]:
1507 """Parse a timezone text fragment (e.g. '+0100').
1509 Args:
1510 text: Text to parse.
1511 Returns: Tuple with timezone as seconds difference to UTC
1512 and a boolean indicating whether this was a UTC timezone
1513 prefixed with a negative sign (-0000).
1514 """
1515 # cgit parses the first character as the sign, and the rest
1516 # as an integer (using strtol), which could also be negative.
1517 # We do the same for compatibility. See #697828.
1518 if text[0] not in b"+-":
1519 raise ValueError("Timezone must start with + or - ({text})".format(**vars()))
1520 sign = text[:1]
1521 offset = int(text[1:])
1522 if sign == b"-":
1523 offset = -offset
1524 unnecessary_negative_timezone = offset >= 0 and sign == b"-"
1525 signum = ((offset < 0) and -1) or 1
1526 offset = abs(offset)
1527 hours = int(offset / 100)
1528 minutes = offset % 100
1529 return (
1530 signum * (hours * 3600 + minutes * 60),
1531 unnecessary_negative_timezone,
1532 )
1535def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
1536 """Format a timezone for Git serialization.
1538 Args:
1539 offset: Timezone offset as seconds difference to UTC
1540 unnecessary_negative_timezone: Whether to use a minus sign for
1541 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700).
1542 """
1543 if offset % 60 != 0:
1544 raise ValueError("Unable to handle non-minute offset.")
1545 if offset < 0 or unnecessary_negative_timezone:
1546 sign = "-"
1547 offset = -offset
1548 else:
1549 sign = "+"
1550 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
1553def parse_time_entry(
1554 value: bytes,
1555) -> tuple[bytes, int | None, tuple[int | None, bool]]:
1556 """Parse event.
1558 Args:
1559 value: Bytes representing a git commit/tag line
1560 Raises:
1561 ObjectFormatException in case of parsing error (malformed
1562 field date)
1563 Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
1564 """
1565 try:
1566 sep = value.rindex(b"> ")
1567 except ValueError:
1568 return (value, None, (None, False))
1569 try:
1570 person = value[0 : sep + 1]
1571 rest = value[sep + 2 :]
1572 timetext, timezonetext = rest.rsplit(b" ", 1)
1573 time = int(timetext)
1574 timezone, timezone_neg_utc = parse_timezone(timezonetext)
1575 except ValueError as exc:
1576 raise ObjectFormatException(exc) from exc
1577 return person, time, (timezone, timezone_neg_utc)
1580def format_time_entry(
1581 person: bytes, time: int, timezone_info: tuple[int, bool]
1582) -> bytes:
1583 """Format an event."""
1584 (timezone, timezone_neg_utc) = timezone_info
1585 return b" ".join(
1586 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
1587 )
1590@replace_me(since="0.21.0", remove_in="0.24.0")
1591def parse_commit(
1592 chunks: Iterable[bytes],
1593) -> tuple[
1594 bytes | None,
1595 list[bytes],
1596 tuple[bytes | None, int | None, tuple[int | None, bool | None]],
1597 tuple[bytes | None, int | None, tuple[int | None, bool | None]],
1598 bytes | None,
1599 list[Tag],
1600 bytes | None,
1601 bytes | None,
1602 list[tuple[bytes, bytes]],
1603]:
1604 """Parse a commit object from chunks.
1606 Args:
1607 chunks: Chunks to parse
1608 Returns: Tuple of (tree, parents, author_info, commit_info,
1609 encoding, mergetag, gpgsig, message, extra)
1610 """
1611 parents = []
1612 extra = []
1613 tree = None
1614 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1615 None,
1616 None,
1617 (None, None),
1618 )
1619 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1620 None,
1621 None,
1622 (None, None),
1623 )
1624 encoding = None
1625 mergetag = []
1626 message = None
1627 gpgsig = None
1629 for field, value in _parse_message(chunks):
1630 # TODO(jelmer): Enforce ordering
1631 if field == _TREE_HEADER:
1632 tree = value
1633 elif field == _PARENT_HEADER:
1634 if value is None:
1635 raise ObjectFormatException("missing parent value")
1636 parents.append(value)
1637 elif field == _AUTHOR_HEADER:
1638 if value is None:
1639 raise ObjectFormatException("missing author value")
1640 author_info = parse_time_entry(value)
1641 elif field == _COMMITTER_HEADER:
1642 if value is None:
1643 raise ObjectFormatException("missing committer value")
1644 commit_info = parse_time_entry(value)
1645 elif field == _ENCODING_HEADER:
1646 encoding = value
1647 elif field == _MERGETAG_HEADER:
1648 if value is None:
1649 raise ObjectFormatException("missing mergetag value")
1650 tag = Tag.from_string(value + b"\n")
1651 assert isinstance(tag, Tag)
1652 mergetag.append(tag)
1653 elif field == _GPGSIG_HEADER:
1654 gpgsig = value
1655 elif field is None:
1656 message = value
1657 else:
1658 if value is None:
1659 raise ObjectFormatException(f"missing value for field {field!r}")
1660 extra.append((field, value))
1661 return (
1662 tree,
1663 parents,
1664 author_info,
1665 commit_info,
1666 encoding,
1667 mergetag,
1668 gpgsig,
1669 message,
1670 extra,
1671 )
1674class Commit(ShaFile):
1675 """A git commit object."""
1677 type_name = b"commit"
1678 type_num = 1
1680 __slots__ = (
1681 "_author",
1682 "_author_time",
1683 "_author_timezone",
1684 "_author_timezone_neg_utc",
1685 "_commit_time",
1686 "_commit_timezone",
1687 "_commit_timezone_neg_utc",
1688 "_committer",
1689 "_encoding",
1690 "_extra",
1691 "_gpgsig",
1692 "_mergetag",
1693 "_message",
1694 "_parents",
1695 "_tree",
1696 )
1698 def __init__(self) -> None:
1699 """Initialize an empty Commit."""
1700 super().__init__()
1701 self._parents: list[bytes] = []
1702 self._encoding: bytes | None = None
1703 self._mergetag: list[Tag] = []
1704 self._gpgsig: bytes | None = None
1705 self._extra: list[tuple[bytes, bytes | None]] = []
1706 self._author_timezone_neg_utc: bool | None = False
1707 self._commit_timezone_neg_utc: bool | None = False
1709 @classmethod
1710 def from_path(cls, path: str | bytes) -> "Commit":
1711 """Read a commit from a file on disk.
1713 Args:
1714 path: Path to the commit file
1716 Returns:
1717 A Commit object
1719 Raises:
1720 NotCommitError: If the file is not a commit
1721 """
1722 commit = ShaFile.from_path(path)
1723 if not isinstance(commit, cls):
1724 raise NotCommitError(_path_to_bytes(path))
1725 return commit
1727 def _deserialize(self, chunks: list[bytes]) -> None:
1728 self._parents = []
1729 self._extra = []
1730 self._tree = None
1731 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1732 None,
1733 None,
1734 (None, None),
1735 )
1736 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1737 None,
1738 None,
1739 (None, None),
1740 )
1741 self._encoding = None
1742 self._mergetag = []
1743 self._message = None
1744 self._gpgsig = None
1746 for field, value in _parse_message(chunks):
1747 # TODO(jelmer): Enforce ordering
1748 if field == _TREE_HEADER:
1749 self._tree = value
1750 elif field == _PARENT_HEADER:
1751 assert value is not None
1752 self._parents.append(value)
1753 elif field == _AUTHOR_HEADER:
1754 if value is None:
1755 raise ObjectFormatException("missing author value")
1756 author_info = parse_time_entry(value)
1757 elif field == _COMMITTER_HEADER:
1758 if value is None:
1759 raise ObjectFormatException("missing committer value")
1760 commit_info = parse_time_entry(value)
1761 elif field == _ENCODING_HEADER:
1762 self._encoding = value
1763 elif field == _MERGETAG_HEADER:
1764 assert value is not None
1765 tag = Tag.from_string(value + b"\n")
1766 assert isinstance(tag, Tag)
1767 self._mergetag.append(tag)
1768 elif field == _GPGSIG_HEADER:
1769 self._gpgsig = value
1770 elif field is None:
1771 self._message = value
1772 else:
1773 self._extra.append((field, value))
1775 (
1776 self._author,
1777 self._author_time,
1778 (self._author_timezone, self._author_timezone_neg_utc),
1779 ) = author_info
1780 (
1781 self._committer,
1782 self._commit_time,
1783 (self._commit_timezone, self._commit_timezone_neg_utc),
1784 ) = commit_info
1786 def check(self) -> None:
1787 """Check this object for internal consistency.
1789 Raises:
1790 ObjectFormatException: if the object is malformed in some way
1791 """
1792 super().check()
1793 assert self._chunked_text is not None
1794 self._check_has_member("_tree", "missing tree")
1795 self._check_has_member("_author", "missing author")
1796 self._check_has_member("_committer", "missing committer")
1797 self._check_has_member("_author_time", "missing author time")
1798 self._check_has_member("_commit_time", "missing commit time")
1800 for parent in self._parents:
1801 check_hexsha(parent, "invalid parent sha")
1802 assert self._tree is not None # checked by _check_has_member above
1803 check_hexsha(self._tree, "invalid tree sha")
1805 assert self._author is not None # checked by _check_has_member above
1806 assert self._committer is not None # checked by _check_has_member above
1807 check_identity(self._author, "invalid author")
1808 check_identity(self._committer, "invalid committer")
1810 assert self._author_time is not None # checked by _check_has_member above
1811 assert self._commit_time is not None # checked by _check_has_member above
1812 check_time(self._author_time)
1813 check_time(self._commit_time)
1815 last = None
1816 for field, _ in _parse_message(self._chunked_text):
1817 if field == _TREE_HEADER and last is not None:
1818 raise ObjectFormatException("unexpected tree")
1819 elif field == _PARENT_HEADER and last not in (
1820 _PARENT_HEADER,
1821 _TREE_HEADER,
1822 ):
1823 raise ObjectFormatException("unexpected parent")
1824 elif field == _AUTHOR_HEADER and last not in (
1825 _TREE_HEADER,
1826 _PARENT_HEADER,
1827 ):
1828 raise ObjectFormatException("unexpected author")
1829 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER:
1830 raise ObjectFormatException("unexpected committer")
1831 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER:
1832 raise ObjectFormatException("unexpected encoding")
1833 last = field
1835 # TODO: optionally check for duplicate parents
1837 def sign(self, keyid: str | None = None) -> None:
1838 """Sign this commit with a GPG key.
1840 Args:
1841 keyid: Optional GPG key ID to use for signing. If not specified,
1842 the default GPG key will be used.
1843 """
1844 import gpg
1846 with gpg.Context(armor=True) as c:
1847 if keyid is not None:
1848 key = c.get_key(keyid)
1849 with gpg.Context(armor=True, signers=[key]) as ctx:
1850 self.gpgsig, _unused_result = ctx.sign(
1851 self.as_raw_string(),
1852 mode=gpg.constants.sig.mode.DETACH,
1853 )
1854 else:
1855 self.gpgsig, _unused_result = c.sign(
1856 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1857 )
1859 def raw_without_sig(self) -> bytes:
1860 """Return raw string serialization without the GPG/SSH signature.
1862 self.gpgsig is a signature for the returned raw byte string serialization.
1863 """
1864 tmp = self.copy()
1865 assert isinstance(tmp, Commit)
1866 tmp._gpgsig = None
1867 tmp.gpgsig = None
1868 return tmp.as_raw_string()
1870 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]:
1871 """Extract the payload, signature, and signature type from this commit.
1873 Returns:
1874 Tuple of (``payload``, ``signature``, ``signature_type``) where:
1876 - ``payload``: The raw commit data without the signature
1877 - ``signature``: The signature bytes if present, None otherwise
1878 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
1880 Raises:
1881 ObjectFormatException: If signature has unknown format
1882 """
1883 if self._gpgsig is None:
1884 return self.as_raw_string(), None, None
1886 payload = self.raw_without_sig()
1888 # Determine signature type
1889 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE):
1890 sig_type = SIGNATURE_PGP
1891 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE):
1892 sig_type = SIGNATURE_SSH
1893 else:
1894 raise ObjectFormatException("Unknown signature format")
1896 return payload, self._gpgsig, sig_type
1898 def verify(self, keyids: Iterable[str] | None = None) -> None:
1899 """Verify GPG signature for this commit (if it is signed).
1901 Args:
1902 keyids: Optional iterable of trusted keyids for this commit.
1903 If this commit is not signed by any key in keyids verification will
1904 fail. If not specified, this function only verifies that the commit
1905 has a valid signature.
1907 Raises:
1908 gpg.errors.BadSignatures: if GPG signature verification fails
1909 gpg.errors.MissingSignatures: if commit was not signed by a key
1910 specified in keyids
1911 """
1912 if self._gpgsig is None:
1913 return
1915 import gpg
1917 with gpg.Context() as ctx:
1918 data, result = ctx.verify(
1919 self.raw_without_sig(),
1920 signature=self._gpgsig,
1921 )
1922 if keyids:
1923 keys = [ctx.get_key(key) for key in keyids]
1924 for key in keys:
1925 for subkey in key.subkeys:
1926 for sig in result.signatures:
1927 if subkey.can_sign and subkey.fpr == sig.fpr:
1928 return
1929 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1931 def _serialize(self) -> list[bytes]:
1932 headers = []
1933 assert self._tree is not None
1934 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
1935 headers.append((_TREE_HEADER, tree_bytes))
1936 for p in self._parents:
1937 headers.append((_PARENT_HEADER, p))
1938 assert self._author is not None
1939 assert self._author_time is not None
1940 assert self._author_timezone is not None
1941 assert self._author_timezone_neg_utc is not None
1942 headers.append(
1943 (
1944 _AUTHOR_HEADER,
1945 format_time_entry(
1946 self._author,
1947 self._author_time,
1948 (self._author_timezone, self._author_timezone_neg_utc),
1949 ),
1950 )
1951 )
1952 assert self._committer is not None
1953 assert self._commit_time is not None
1954 assert self._commit_timezone is not None
1955 assert self._commit_timezone_neg_utc is not None
1956 headers.append(
1957 (
1958 _COMMITTER_HEADER,
1959 format_time_entry(
1960 self._committer,
1961 self._commit_time,
1962 (self._commit_timezone, self._commit_timezone_neg_utc),
1963 ),
1964 )
1965 )
1966 if self.encoding:
1967 headers.append((_ENCODING_HEADER, self.encoding))
1968 for mergetag in self.mergetag:
1969 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
1970 headers.extend(
1971 (field, value) for field, value in self._extra if value is not None
1972 )
1973 if self.gpgsig:
1974 headers.append((_GPGSIG_HEADER, self.gpgsig))
1975 return list(_format_message(headers, self._message))
1977 tree = serializable_property("tree", "Tree that is the state of this commit")
1979 def _get_parents(self) -> list[bytes]:
1980 """Return a list of parents of this commit."""
1981 return self._parents
1983 def _set_parents(self, value: list[bytes]) -> None:
1984 """Set a list of parents of this commit."""
1985 self._needs_serialization = True
1986 self._parents = value
1988 parents = property(
1989 _get_parents,
1990 _set_parents,
1991 doc="Parents of this commit, by their SHA1.",
1992 )
1994 @replace_me(since="0.21.0", remove_in="0.24.0")
1995 def _get_extra(self) -> list[tuple[bytes, bytes | None]]:
1996 """Return extra settings of this commit."""
1997 return self._extra
1999 extra = property(
2000 _get_extra,
2001 doc="Extra header fields not understood (presumably added in a "
2002 "newer version of git). Kept verbatim so the object can "
2003 "be correctly reserialized. For private commit metadata, use "
2004 "pseudo-headers in Commit.message, rather than this field.",
2005 )
2007 author = serializable_property("author", "The name of the author of the commit")
2009 committer = serializable_property(
2010 "committer", "The name of the committer of the commit"
2011 )
2013 message = serializable_property("message", "The commit message")
2015 commit_time = serializable_property(
2016 "commit_time",
2017 "The timestamp of the commit. As the number of seconds since the epoch.",
2018 )
2020 commit_timezone = serializable_property(
2021 "commit_timezone", "The zone the commit time is in"
2022 )
2024 author_time = serializable_property(
2025 "author_time",
2026 "The timestamp the commit was written. As the number of "
2027 "seconds since the epoch.",
2028 )
2030 author_timezone = serializable_property(
2031 "author_timezone", "Returns the zone the author time is in."
2032 )
2034 encoding = serializable_property("encoding", "Encoding of the commit message.")
2036 mergetag = serializable_property("mergetag", "Associated signed tag.")
2038 gpgsig = serializable_property("gpgsig", "GPG Signature.")
2041OBJECT_CLASSES = (
2042 Commit,
2043 Tree,
2044 Blob,
2045 Tag,
2046)
2048_TYPE_MAP: dict[bytes | int, type[ShaFile]] = {}
2050for cls in OBJECT_CLASSES:
2051 _TYPE_MAP[cls.type_name] = cls
2052 _TYPE_MAP[cls.type_num] = cls
2055# Hold on to the pure-python implementations for testing
2056_parse_tree_py = parse_tree
2057_sorted_tree_items_py = sorted_tree_items
2058try:
2059 # Try to import Rust versions
2060 from dulwich._objects import (
2061 parse_tree as _parse_tree_rs,
2062 )
2063 from dulwich._objects import (
2064 sorted_tree_items as _sorted_tree_items_rs,
2065 )
2066except ImportError:
2067 pass
2068else:
2069 parse_tree = _parse_tree_rs
2070 sorted_tree_items = _sorted_tree_items_rs