Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/objects.py: 45%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# objects.py -- Access to base git objects
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
23"""Access to base git objects."""
25__all__ = [
26 "BEGIN_PGP_SIGNATURE",
27 "BEGIN_SSH_SIGNATURE",
28 "MAX_TIME",
29 "OBJECT_CLASSES",
30 "SIGNATURE_PGP",
31 "SIGNATURE_SSH",
32 "S_IFGITLINK",
33 "S_ISGITLINK",
34 "ZERO_SHA",
35 "Blob",
36 "Commit",
37 "EmptyFileException",
38 "FixedSha",
39 "ObjectID",
40 "RawObjectID",
41 "ShaFile",
42 "SubmoduleEncountered",
43 "Tag",
44 "Tree",
45 "TreeEntry",
46 "check_hexsha",
47 "check_identity",
48 "check_time",
49 "filename_to_hex",
50 "format_time_entry",
51 "format_timezone",
52 "git_line",
53 "hex_to_filename",
54 "hex_to_sha",
55 "is_blob",
56 "is_commit",
57 "is_tag",
58 "is_tree",
59 "key_entry",
60 "key_entry_name_order",
61 "object_class",
62 "object_header",
63 "parse_commit",
64 "parse_time_entry",
65 "parse_timezone",
66 "parse_tree",
67 "pretty_format_tree_entry",
68 "serializable_property",
69 "serialize_tree",
70 "sha_to_hex",
71 "sorted_tree_items",
72 "valid_hexsha",
73]
75import binascii
76import os
77import posixpath
78import stat
79import sys
80import zlib
81from collections.abc import Callable, Iterable, Iterator, Sequence
82from hashlib import sha1
83from io import BufferedIOBase, BytesIO
84from typing import (
85 IO,
86 TYPE_CHECKING,
87 NamedTuple,
88 TypeVar,
89)
91if sys.version_info >= (3, 11):
92 from typing import Self
93else:
94 from typing_extensions import Self
96from typing import NewType, TypeGuard
98from . import replace_me
99from .errors import (
100 ChecksumMismatch,
101 FileFormatException,
102 NotBlobError,
103 NotCommitError,
104 NotTagError,
105 NotTreeError,
106 ObjectFormatException,
107)
108from .file import GitFile
110if TYPE_CHECKING:
111 from _hashlib import HASH
113 from .file import _GitFile
115# Header fields for commits
116_TREE_HEADER = b"tree"
117_PARENT_HEADER = b"parent"
118_AUTHOR_HEADER = b"author"
119_COMMITTER_HEADER = b"committer"
120_ENCODING_HEADER = b"encoding"
121_MERGETAG_HEADER = b"mergetag"
122_GPGSIG_HEADER = b"gpgsig"
124# Header fields for objects
125_OBJECT_HEADER = b"object"
126_TYPE_HEADER = b"type"
127_TAG_HEADER = b"tag"
128_TAGGER_HEADER = b"tagger"
131S_IFGITLINK = 0o160000
134MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
136BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
137BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----"
139# Signature type constants
140SIGNATURE_PGP = b"pgp"
141SIGNATURE_SSH = b"ssh"
144# Hex SHA type
145ObjectID = NewType("ObjectID", bytes)
147# Raw SHA type
148RawObjectID = NewType("RawObjectID", bytes)
150# Zero SHA constant
151ZERO_SHA: ObjectID = ObjectID(b"0" * 40)
154class EmptyFileException(FileFormatException):
155 """An unexpectedly empty file was encountered."""
158def S_ISGITLINK(m: int) -> bool:
159 """Check if a mode indicates a submodule.
161 Args:
162 m: Mode to check
163 Returns: a ``boolean``
164 """
165 return stat.S_IFMT(m) == S_IFGITLINK
168def _decompress(string: bytes) -> bytes:
169 dcomp = zlib.decompressobj()
170 dcomped = dcomp.decompress(string)
171 dcomped += dcomp.flush()
172 return dcomped
175def sha_to_hex(sha: RawObjectID) -> ObjectID:
176 """Takes a string and returns the hex of the sha within."""
177 hexsha = binascii.hexlify(sha)
178 assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}"
179 return ObjectID(hexsha)
182def hex_to_sha(hex: ObjectID | str) -> RawObjectID:
183 """Takes a hex sha and returns a binary sha."""
184 assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}"
185 try:
186 return RawObjectID(binascii.unhexlify(hex))
187 except TypeError as exc:
188 if not isinstance(hex, bytes):
189 raise
190 raise ValueError(exc.args[0]) from exc
193def valid_hexsha(hex: bytes | str) -> bool:
194 """Check if a string is a valid hex SHA.
196 Args:
197 hex: Hex string to check
199 Returns:
200 True if valid hex SHA, False otherwise
201 """
202 if len(hex) != 40:
203 return False
204 try:
205 binascii.unhexlify(hex)
206 except (TypeError, binascii.Error):
207 return False
208 else:
209 return True
212PathT = TypeVar("PathT", str, bytes)
215def hex_to_filename(path: PathT, hex: str | bytes) -> PathT:
216 """Takes a hex sha and returns its filename relative to the given path."""
217 # os.path.join accepts bytes or unicode, but all args must be of the same
218 # type. Make sure that hex which is expected to be bytes, is the same type
219 # as path.
220 if isinstance(path, str):
221 if isinstance(hex, bytes):
222 hex_str = hex.decode("ascii")
223 else:
224 hex_str = hex
225 dir_name = hex_str[:2]
226 file_name = hex_str[2:]
227 result = os.path.join(path, dir_name, file_name)
228 assert isinstance(result, str)
229 return result
230 else:
231 # path is bytes
232 if isinstance(hex, str):
233 hex_bytes = hex.encode("ascii")
234 else:
235 hex_bytes = hex
236 dir_name_b = hex_bytes[:2]
237 file_name_b = hex_bytes[2:]
238 result_b = os.path.join(path, dir_name_b, file_name_b)
239 assert isinstance(result_b, bytes)
240 return result_b
243def filename_to_hex(filename: str | bytes) -> str:
244 """Takes an object filename and returns its corresponding hex sha."""
245 # grab the last (up to) two path components
246 errmsg = f"Invalid object filename: {filename!r}"
247 if isinstance(filename, str):
248 names = filename.rsplit(os.path.sep, 2)[-2:]
249 assert len(names) == 2, errmsg
250 base, rest = names
251 assert len(base) == 2 and len(rest) == 38, errmsg
252 hex_str = base + rest
253 hex_bytes = hex_str.encode("ascii")
254 else:
255 # filename is bytes
256 sep = (
257 os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep
258 )
259 names_b = filename.rsplit(sep, 2)[-2:]
260 assert len(names_b) == 2, errmsg
261 base_b, rest_b = names_b
262 assert len(base_b) == 2 and len(rest_b) == 38, errmsg
263 hex_bytes = base_b + rest_b
264 hex_to_sha(ObjectID(hex_bytes))
265 return hex_bytes.decode("ascii")
268def object_header(num_type: int, length: int) -> bytes:
269 """Return an object header for the given numeric type and text length."""
270 cls = object_class(num_type)
271 if cls is None:
272 raise AssertionError(f"unsupported class type num: {num_type}")
273 return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
276def serializable_property(name: str, docstring: str | None = None) -> property:
277 """A property that helps tracking whether serialization is necessary."""
279 def set(obj: "ShaFile", value: object) -> None:
280 """Set the property value and mark the object as needing serialization.
282 Args:
283 obj: The ShaFile object
284 value: The value to set
285 """
286 setattr(obj, "_" + name, value)
287 obj._needs_serialization = True
289 def get(obj: "ShaFile") -> object:
290 """Get the property value.
292 Args:
293 obj: The ShaFile object
295 Returns:
296 The property value
297 """
298 return getattr(obj, "_" + name)
300 return property(get, set, doc=docstring)
303def object_class(type: bytes | int) -> type["ShaFile"] | None:
304 """Get the object class corresponding to the given type.
306 Args:
307 type: Either a type name string or a numeric type.
308 Returns: The ShaFile subclass corresponding to the given type, or None if
309 type is not a valid type name/number.
310 """
311 return _TYPE_MAP.get(type, None)
314def check_hexsha(hex: str | bytes, error_msg: str) -> None:
315 """Check if a string is a valid hex sha string.
317 Args:
318 hex: Hex string to check
319 error_msg: Error message to use in exception
320 Raises:
321 ObjectFormatException: Raised when the string is not valid
322 """
323 if not valid_hexsha(hex):
324 raise ObjectFormatException(f"{error_msg} {hex!r}")
327def check_identity(identity: bytes | None, error_msg: str) -> None:
328 """Check if the specified identity is valid.
330 This will raise an exception if the identity is not valid.
332 Args:
333 identity: Identity string
334 error_msg: Error message to use in exception
335 """
336 if identity is None:
337 raise ObjectFormatException(error_msg)
338 email_start = identity.find(b"<")
339 email_end = identity.find(b">")
340 if not all(
341 [
342 email_start >= 1,
343 identity[email_start - 1] == b" "[0],
344 identity.find(b"<", email_start + 1) == -1,
345 email_end == len(identity) - 1,
346 b"\0" not in identity,
347 b"\n" not in identity,
348 ]
349 ):
350 raise ObjectFormatException(error_msg)
353def _path_to_bytes(path: str | bytes) -> bytes:
354 """Convert a path to bytes for use in error messages."""
355 if isinstance(path, str):
356 return path.encode("utf-8", "surrogateescape")
357 return path
360def check_time(time_seconds: int) -> None:
361 """Check if the specified time is not prone to overflow error.
363 This will raise an exception if the time is not valid.
365 Args:
366 time_seconds: time in seconds
368 """
369 # Prevent overflow error
370 if time_seconds > MAX_TIME:
371 raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
374def git_line(*items: bytes) -> bytes:
375 """Formats items into a space separated line."""
376 return b" ".join(items) + b"\n"
379class FixedSha:
380 """SHA object that behaves like hashlib's but is given a fixed value."""
382 __slots__ = ("_hexsha", "_sha")
384 def __init__(self, hexsha: str | bytes) -> None:
385 """Initialize FixedSha with a fixed SHA value.
387 Args:
388 hexsha: Hex SHA value as string or bytes
389 """
390 if isinstance(hexsha, str):
391 hexsha = hexsha.encode("ascii")
392 if not isinstance(hexsha, bytes):
393 raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
394 self._hexsha = hexsha
395 self._sha = hex_to_sha(ObjectID(hexsha))
397 def digest(self) -> bytes:
398 """Return the raw SHA digest."""
399 return self._sha
401 def hexdigest(self) -> str:
402 """Return the hex SHA digest."""
403 return self._hexsha.decode("ascii")
406# Type guard functions for runtime type narrowing
407if TYPE_CHECKING:
409 def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
410 """Check if a ShaFile is a Commit."""
411 return obj.type_name == b"commit"
413 def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
414 """Check if a ShaFile is a Tree."""
415 return obj.type_name == b"tree"
417 def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
418 """Check if a ShaFile is a Blob."""
419 return obj.type_name == b"blob"
421 def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
422 """Check if a ShaFile is a Tag."""
423 return obj.type_name == b"tag"
424else:
425 # Runtime versions without type narrowing
426 def is_commit(obj: "ShaFile") -> bool:
427 """Check if a ShaFile is a Commit."""
428 return obj.type_name == b"commit"
430 def is_tree(obj: "ShaFile") -> bool:
431 """Check if a ShaFile is a Tree."""
432 return obj.type_name == b"tree"
434 def is_blob(obj: "ShaFile") -> bool:
435 """Check if a ShaFile is a Blob."""
436 return obj.type_name == b"blob"
438 def is_tag(obj: "ShaFile") -> bool:
439 """Check if a ShaFile is a Tag."""
440 return obj.type_name == b"tag"
443class ShaFile:
444 """A git SHA file."""
446 __slots__ = ("_chunked_text", "_needs_serialization", "_sha")
448 _needs_serialization: bool
449 type_name: bytes
450 type_num: int
451 _chunked_text: list[bytes] | None
452 _sha: "FixedSha | None | HASH"
454 @staticmethod
455 def _parse_legacy_object_header(
456 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
457 ) -> "ShaFile":
458 """Parse a legacy object, creating it but not reading the file."""
459 bufsize = 1024
460 decomp = zlib.decompressobj()
461 header = decomp.decompress(magic)
462 start = 0
463 end = -1
464 while end < 0:
465 extra = f.read(bufsize)
466 header += decomp.decompress(extra)
467 magic += extra
468 end = header.find(b"\0", start)
469 start = len(header)
470 header = header[:end]
471 type_name, size = header.split(b" ", 1)
472 try:
473 int(size) # sanity check
474 except ValueError as exc:
475 raise ObjectFormatException(f"Object size not an integer: {exc}") from exc
476 obj_class = object_class(type_name)
477 if not obj_class:
478 raise ObjectFormatException(
479 "Not a known type: {}".format(type_name.decode("ascii"))
480 )
481 return obj_class()
483 def _parse_legacy_object(self, map: bytes) -> None:
484 """Parse a legacy object, setting the raw string."""
485 text = _decompress(map)
486 header_end = text.find(b"\0")
487 if header_end < 0:
488 raise ObjectFormatException("Invalid object header, no \\0")
489 self.set_raw_string(text[header_end + 1 :])
491 def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
492 """Return chunks representing the object in the experimental format.
494 Returns: List of strings
495 """
496 compobj = zlib.compressobj(compression_level)
497 yield compobj.compress(self._header())
498 for chunk in self.as_raw_chunks():
499 yield compobj.compress(chunk)
500 yield compobj.flush()
502 def as_legacy_object(self, compression_level: int = -1) -> bytes:
503 """Return string representing the object in the experimental format."""
504 return b"".join(
505 self.as_legacy_object_chunks(compression_level=compression_level)
506 )
508 def as_raw_chunks(self) -> list[bytes]:
509 """Return chunks with serialization of the object.
511 Returns: List of strings, not necessarily one per line
512 """
513 if self._needs_serialization:
514 self._sha = None
515 self._chunked_text = self._serialize()
516 self._needs_serialization = False
517 assert self._chunked_text is not None
518 return self._chunked_text
520 def as_raw_string(self) -> bytes:
521 """Return raw string with serialization of the object.
523 Returns: String object
524 """
525 return b"".join(self.as_raw_chunks())
527 def __bytes__(self) -> bytes:
528 """Return raw string serialization of this object."""
529 return self.as_raw_string()
531 def __hash__(self) -> int:
532 """Return unique hash for this object."""
533 return hash(self.id)
535 def as_pretty_string(self) -> str:
536 """Return a string representing this object, fit for display."""
537 return self.as_raw_string().decode("utf-8", "replace")
539 def set_raw_string(
540 self, text: bytes, sha: ObjectID | RawObjectID | None = None
541 ) -> None:
542 """Set the contents of this object from a serialized string."""
543 if not isinstance(text, bytes):
544 raise TypeError(f"Expected bytes for text, got {text!r}")
545 self.set_raw_chunks([text], sha)
547 def set_raw_chunks(
548 self, chunks: list[bytes], sha: ObjectID | RawObjectID | None = None
549 ) -> None:
550 """Set the contents of this object from a list of chunks."""
551 self._chunked_text = chunks
552 self._deserialize(chunks)
553 if sha is None:
554 self._sha = None
555 else:
556 self._sha = FixedSha(sha)
557 self._needs_serialization = False
559 @staticmethod
560 def _parse_object_header(
561 magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
562 ) -> "ShaFile":
563 """Parse a new style object, creating it but not reading the file."""
564 num_type = (ord(magic[0:1]) >> 4) & 7
565 obj_class = object_class(num_type)
566 if not obj_class:
567 raise ObjectFormatException(f"Not a known type {num_type}")
568 return obj_class()
570 def _parse_object(self, map: bytes) -> None:
571 """Parse a new style object, setting self._text."""
572 # skip type and size; type must have already been determined, and
573 # we trust zlib to fail if it's otherwise corrupted
574 byte = ord(map[0:1])
575 used = 1
576 while (byte & 0x80) != 0:
577 byte = ord(map[used : used + 1])
578 used += 1
579 raw = map[used:]
580 self.set_raw_string(_decompress(raw))
582 @classmethod
583 def _is_legacy_object(cls, magic: bytes) -> bool:
584 b0 = ord(magic[0:1])
585 b1 = ord(magic[1:2])
586 word = (b0 << 8) + b1
587 return (b0 & 0x8F) == 0x08 and (word % 31) == 0
589 @classmethod
590 def _parse_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile":
591 map = f.read()
592 if not map:
593 raise EmptyFileException("Corrupted empty file detected")
595 if cls._is_legacy_object(map):
596 obj = cls._parse_legacy_object_header(map, f)
597 obj._parse_legacy_object(map)
598 else:
599 obj = cls._parse_object_header(map, f)
600 obj._parse_object(map)
601 return obj
603 def __init__(self) -> None:
604 """Don't call this directly."""
605 self._sha = None
606 self._chunked_text = []
607 self._needs_serialization = True
609 def _deserialize(self, chunks: list[bytes]) -> None:
610 raise NotImplementedError(self._deserialize)
612 def _serialize(self) -> list[bytes]:
613 raise NotImplementedError(self._serialize)
615 @classmethod
616 def from_path(cls, path: str | bytes) -> "ShaFile":
617 """Open a SHA file from disk."""
618 with GitFile(path, "rb") as f:
619 return cls.from_file(f)
621 @classmethod
622 def from_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile":
623 """Get the contents of a SHA file on disk."""
624 try:
625 obj = cls._parse_file(f)
626 obj._sha = None
627 return obj
628 except (IndexError, ValueError) as exc:
629 raise ObjectFormatException("invalid object header") from exc
631 @staticmethod
632 def from_raw_string(
633 type_num: int, string: bytes, sha: ObjectID | RawObjectID | None = None
634 ) -> "ShaFile":
635 """Creates an object of the indicated type from the raw string given.
637 Args:
638 type_num: The numeric type of the object.
639 string: The raw uncompressed contents.
640 sha: Optional known sha for the object
641 """
642 cls = object_class(type_num)
643 if cls is None:
644 raise AssertionError(f"unsupported class type num: {type_num}")
645 obj = cls()
646 obj.set_raw_string(string, sha)
647 return obj
649 @staticmethod
650 def from_raw_chunks(
651 type_num: int, chunks: list[bytes], sha: ObjectID | RawObjectID | None = None
652 ) -> "ShaFile":
653 """Creates an object of the indicated type from the raw chunks given.
655 Args:
656 type_num: The numeric type of the object.
657 chunks: An iterable of the raw uncompressed contents.
658 sha: Optional known sha for the object
659 """
660 cls = object_class(type_num)
661 if cls is None:
662 raise AssertionError(f"unsupported class type num: {type_num}")
663 obj = cls()
664 obj.set_raw_chunks(chunks, sha)
665 return obj
667 @classmethod
668 def from_string(cls, string: bytes) -> Self:
669 """Create a ShaFile from a string."""
670 obj = cls()
671 obj.set_raw_string(string)
672 return obj
674 def _check_has_member(self, member: str, error_msg: str) -> None:
675 """Check that the object has a given member variable.
677 Args:
678 member: the member variable to check for
679 error_msg: the message for an error if the member is missing
680 Raises:
681 ObjectFormatException: with the given error_msg if member is
682 missing or is None
683 """
684 if getattr(self, member, None) is None:
685 raise ObjectFormatException(error_msg)
687 def check(self) -> None:
688 """Check this object for internal consistency.
690 Raises:
691 ObjectFormatException: if the object is malformed in some way
692 ChecksumMismatch: if the object was created with a SHA that does
693 not match its contents
694 """
695 # TODO: if we find that error-checking during object parsing is a
696 # performance bottleneck, those checks should be moved to the class's
697 # check() method during optimization so we can still check the object
698 # when necessary.
699 old_sha = self.id
700 try:
701 self._deserialize(self.as_raw_chunks())
702 self._sha = None
703 new_sha = self.id
704 except Exception as exc:
705 raise ObjectFormatException(exc) from exc
706 if old_sha != new_sha:
707 raise ChecksumMismatch(new_sha, old_sha)
709 def _header(self) -> bytes:
710 return object_header(self.type_num, self.raw_length())
712 def raw_length(self) -> int:
713 """Returns the length of the raw string of this object."""
714 return sum(map(len, self.as_raw_chunks()))
716 def sha(self) -> "FixedSha | HASH":
717 """The SHA1 object that is the name of this object."""
718 if self._sha is None or self._needs_serialization:
719 # this is a local because as_raw_chunks() overwrites self._sha
720 new_sha = sha1()
721 new_sha.update(self._header())
722 for chunk in self.as_raw_chunks():
723 new_sha.update(chunk)
724 self._sha = new_sha
725 return self._sha
727 def copy(self) -> "ShaFile":
728 """Create a new copy of this SHA1 object from its raw string."""
729 obj_class = object_class(self.type_num)
730 if obj_class is None:
731 raise AssertionError(f"invalid type num {self.type_num}")
732 return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
734 @property
735 def id(self) -> ObjectID:
736 """The hex SHA of this object."""
737 return ObjectID(self.sha().hexdigest().encode("ascii"))
739 def __repr__(self) -> str:
740 """Return string representation of this object."""
741 return f"<{self.__class__.__name__} {self.id!r}>"
743 def __ne__(self, other: object) -> bool:
744 """Check whether this object does not match the other."""
745 return not isinstance(other, ShaFile) or self.id != other.id
747 def __eq__(self, other: object) -> bool:
748 """Return True if the SHAs of the two objects match."""
749 return isinstance(other, ShaFile) and self.id == other.id
751 def __lt__(self, other: object) -> bool:
752 """Return whether SHA of this object is less than the other."""
753 if not isinstance(other, ShaFile):
754 raise TypeError
755 return self.id < other.id
757 def __le__(self, other: object) -> bool:
758 """Check whether SHA of this object is less than or equal to the other."""
759 if not isinstance(other, ShaFile):
760 raise TypeError
761 return self.id <= other.id
764class Blob(ShaFile):
765 """A Git Blob object."""
767 __slots__ = ()
769 type_name = b"blob"
770 type_num = 3
772 _chunked_text: list[bytes]
774 def __init__(self) -> None:
775 """Initialize a new Blob object."""
776 super().__init__()
777 self._chunked_text = []
778 self._needs_serialization = False
780 def _get_data(self) -> bytes:
781 return self.as_raw_string()
783 def _set_data(self, data: bytes) -> None:
784 self.set_raw_string(data)
786 data = property(
787 _get_data, _set_data, doc="The text contained within the blob object."
788 )
790 def _get_chunked(self) -> list[bytes]:
791 return self._chunked_text
793 def _set_chunked(self, chunks: list[bytes]) -> None:
794 self._chunked_text = chunks
796 def _serialize(self) -> list[bytes]:
797 return self._chunked_text
799 def _deserialize(self, chunks: list[bytes]) -> None:
800 self._chunked_text = chunks
802 chunked = property(
803 _get_chunked,
804 _set_chunked,
805 doc="The text in the blob object, as chunks (not necessarily lines)",
806 )
808 @classmethod
809 def from_path(cls, path: str | bytes) -> "Blob":
810 """Read a blob from a file on disk.
812 Args:
813 path: Path to the blob file
815 Returns:
816 A Blob object
818 Raises:
819 NotBlobError: If the file is not a blob
820 """
821 blob = ShaFile.from_path(path)
822 if not isinstance(blob, cls):
823 raise NotBlobError(_path_to_bytes(path))
824 return blob
826 def check(self) -> None:
827 """Check this object for internal consistency.
829 Raises:
830 ObjectFormatException: if the object is malformed in some way
831 """
832 super().check()
834 def splitlines(self) -> list[bytes]:
835 """Return list of lines in this blob.
837 This preserves the original line endings.
838 """
839 chunks = self.chunked
840 if not chunks:
841 return []
842 if len(chunks) == 1:
843 result: list[bytes] = chunks[0].splitlines(True)
844 return result
845 remaining = None
846 ret = []
847 for chunk in chunks:
848 lines = chunk.splitlines(True)
849 if len(lines) > 1:
850 ret.append((remaining or b"") + lines[0])
851 ret.extend(lines[1:-1])
852 remaining = lines[-1]
853 elif len(lines) == 1:
854 if remaining is None:
855 remaining = lines.pop()
856 else:
857 remaining += lines.pop()
858 if remaining is not None:
859 ret.append(remaining)
860 return ret
863def _parse_message(
864 chunks: Iterable[bytes],
865) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]:
866 """Parse a message with a list of fields and a body.
868 Args:
869 chunks: the raw chunks of the tag or commit object.
870 Returns: iterator of tuples of (field, value), one per header line, in the
871 order read from the text, possibly including duplicates. Includes a
872 field named None for the freeform tag/commit text.
873 """
874 f = BytesIO(b"".join(chunks))
875 k = None
876 v = b""
877 eof = False
879 def _strip_last_newline(value: bytes) -> bytes:
880 """Strip the last newline from value."""
881 if value and value.endswith(b"\n"):
882 return value[:-1]
883 return value
885 # Parse the headers
886 #
887 # Headers can contain newlines. The next line is indented with a space.
888 # We store the latest key as 'k', and the accumulated value as 'v'.
889 for line in f:
890 if line.startswith(b" "):
891 # Indented continuation of the previous line
892 v += line[1:]
893 else:
894 if k is not None:
895 # We parsed a new header, return its value
896 yield (k, _strip_last_newline(v))
897 if line == b"\n":
898 # Empty line indicates end of headers
899 break
900 (k, v) = line.split(b" ", 1)
902 else:
903 # We reached end of file before the headers ended. We still need to
904 # return the previous header, then we need to return a None field for
905 # the text.
906 eof = True
907 if k is not None:
908 yield (k, _strip_last_newline(v))
909 yield (None, None)
911 if not eof:
912 # We didn't reach the end of file while parsing headers. We can return
913 # the rest of the file as a message.
914 yield (None, f.read())
916 f.close()
919def _format_message(
920 headers: Sequence[tuple[bytes, bytes]], body: bytes | None
921) -> Iterator[bytes]:
922 for field, value in headers:
923 lines = value.split(b"\n")
924 yield git_line(field, lines[0])
925 for line in lines[1:]:
926 yield b" " + line + b"\n"
927 yield b"\n" # There must be a new line after the headers
928 if body:
929 yield body
932class Tag(ShaFile):
933 """A Git Tag object."""
935 type_name = b"tag"
936 type_num = 4
938 __slots__ = (
939 "_message",
940 "_name",
941 "_object_class",
942 "_object_sha",
943 "_signature",
944 "_tag_time",
945 "_tag_timezone",
946 "_tag_timezone_neg_utc",
947 "_tagger",
948 )
950 _message: bytes | None
951 _name: bytes | None
952 _object_class: "type[ShaFile] | None"
953 _object_sha: bytes | None
954 _signature: bytes | None
955 _tag_time: int | None
956 _tag_timezone: int | None
957 _tag_timezone_neg_utc: bool | None
958 _tagger: bytes | None
960 def __init__(self) -> None:
961 """Initialize a new Tag object."""
962 super().__init__()
963 self._tagger = None
964 self._tag_time = None
965 self._tag_timezone = None
966 self._tag_timezone_neg_utc = False
967 self._signature: bytes | None = None
969 @classmethod
970 def from_path(cls, filename: str | bytes) -> "Tag":
971 """Read a tag from a file on disk.
973 Args:
974 filename: Path to the tag file
976 Returns:
977 A Tag object
979 Raises:
980 NotTagError: If the file is not a tag
981 """
982 tag = ShaFile.from_path(filename)
983 if not isinstance(tag, cls):
984 raise NotTagError(_path_to_bytes(filename))
985 return tag
987 def check(self) -> None:
988 """Check this object for internal consistency.
990 Raises:
991 ObjectFormatException: if the object is malformed in some way
992 """
993 super().check()
994 assert self._chunked_text is not None
995 self._check_has_member("_object_sha", "missing object sha")
996 self._check_has_member("_object_class", "missing object type")
997 self._check_has_member("_name", "missing tag name")
999 if not self._name:
1000 raise ObjectFormatException("empty tag name")
1002 if self._object_sha is None:
1003 raise ObjectFormatException("missing object sha")
1004 check_hexsha(self._object_sha, "invalid object sha")
1006 if self._tagger is not None:
1007 check_identity(self._tagger, "invalid tagger")
1009 self._check_has_member("_tag_time", "missing tag time")
1010 if self._tag_time is None:
1011 raise ObjectFormatException("missing tag time")
1012 check_time(self._tag_time)
1014 last = None
1015 for field, _ in _parse_message(self._chunked_text):
1016 if field == _OBJECT_HEADER and last is not None:
1017 raise ObjectFormatException("unexpected object")
1018 elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
1019 raise ObjectFormatException("unexpected type")
1020 elif field == _TAG_HEADER and last != _TYPE_HEADER:
1021 raise ObjectFormatException("unexpected tag name")
1022 elif field == _TAGGER_HEADER and last != _TAG_HEADER:
1023 raise ObjectFormatException("unexpected tagger")
1024 last = field
1026 def _serialize(self) -> list[bytes]:
1027 headers = []
1028 if self._object_sha is None:
1029 raise ObjectFormatException("missing object sha")
1030 headers.append((_OBJECT_HEADER, self._object_sha))
1031 if self._object_class is None:
1032 raise ObjectFormatException("missing object class")
1033 headers.append((_TYPE_HEADER, self._object_class.type_name))
1034 if self._name is None:
1035 raise ObjectFormatException("missing tag name")
1036 headers.append((_TAG_HEADER, self._name))
1037 if self._tagger:
1038 if self._tag_time is None:
1039 headers.append((_TAGGER_HEADER, self._tagger))
1040 else:
1041 if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
1042 raise ObjectFormatException("missing timezone info")
1043 headers.append(
1044 (
1045 _TAGGER_HEADER,
1046 format_time_entry(
1047 self._tagger,
1048 self._tag_time,
1049 (self._tag_timezone, self._tag_timezone_neg_utc),
1050 ),
1051 )
1052 )
1054 if self.message is None and self._signature is None:
1055 body = None
1056 else:
1057 body = (self.message or b"") + (self._signature or b"")
1058 return list(_format_message(headers, body))
1060 def _deserialize(self, chunks: list[bytes]) -> None:
1061 """Grab the metadata attached to the tag."""
1062 self._tagger = None
1063 self._tag_time = None
1064 self._tag_timezone = None
1065 self._tag_timezone_neg_utc = False
1066 for field, value in _parse_message(chunks):
1067 if field == _OBJECT_HEADER:
1068 self._object_sha = value
1069 elif field == _TYPE_HEADER:
1070 assert isinstance(value, bytes)
1071 obj_class = object_class(value)
1072 if not obj_class:
1073 raise ObjectFormatException(f"Not a known type: {value!r}")
1074 self._object_class = obj_class
1075 elif field == _TAG_HEADER:
1076 self._name = value
1077 elif field == _TAGGER_HEADER:
1078 if value is None:
1079 raise ObjectFormatException("missing tagger value")
1080 (
1081 self._tagger,
1082 self._tag_time,
1083 (self._tag_timezone, self._tag_timezone_neg_utc),
1084 ) = parse_time_entry(value)
1085 elif field is None:
1086 if value is None:
1087 self._message = None
1088 self._signature = None
1089 else:
1090 # Try to find either PGP or SSH signature
1091 sig_idx = None
1092 try:
1093 sig_idx = value.index(BEGIN_PGP_SIGNATURE)
1094 except ValueError:
1095 try:
1096 sig_idx = value.index(BEGIN_SSH_SIGNATURE)
1097 except ValueError:
1098 pass
1100 if sig_idx is not None:
1101 self._message = value[:sig_idx]
1102 self._signature = value[sig_idx:]
1103 else:
1104 self._message = value
1105 self._signature = None
1106 else:
1107 raise ObjectFormatException(
1108 f"Unknown field {field.decode('ascii', 'replace')}"
1109 )
1111 def _get_object(self) -> tuple[type[ShaFile], bytes]:
1112 """Get the object pointed to by this tag.
1114 Returns: tuple of (object class, sha).
1115 """
1116 if self._object_class is None or self._object_sha is None:
1117 raise ValueError("Tag object is not properly initialized")
1118 return (self._object_class, self._object_sha)
1120 def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
1121 (self._object_class, self._object_sha) = value
1122 self._needs_serialization = True
1124 object = property(_get_object, _set_object)
1126 name = serializable_property("name", "The name of this tag")
1127 tagger = serializable_property(
1128 "tagger", "Returns the name of the person who created this tag"
1129 )
1130 tag_time = serializable_property(
1131 "tag_time",
1132 "The creation timestamp of the tag. As the number of seconds since the epoch",
1133 )
1134 tag_timezone = serializable_property(
1135 "tag_timezone", "The timezone that tag_time is in."
1136 )
1137 message = serializable_property("message", "the message attached to this tag")
1139 signature = serializable_property("signature", "Optional detached GPG signature")
1141 def sign(self, keyid: str | None = None) -> None:
1142 """Sign this tag with a GPG key.
1144 Args:
1145 keyid: Optional GPG key ID to use for signing. If not specified,
1146 the default GPG key will be used.
1147 """
1148 import gpg
1150 with gpg.Context(armor=True) as c:
1151 if keyid is not None:
1152 key = c.get_key(keyid)
1153 with gpg.Context(armor=True, signers=[key]) as ctx:
1154 self.signature, _unused_result = ctx.sign(
1155 self.as_raw_string(),
1156 mode=gpg.constants.sig.mode.DETACH,
1157 )
1158 else:
1159 self.signature, _unused_result = c.sign(
1160 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1161 )
1163 def raw_without_sig(self) -> bytes:
1164 """Return raw string serialization without the GPG/SSH signature.
1166 self.signature is a signature for the returned raw byte string serialization.
1167 """
1168 ret = self.as_raw_string()
1169 if self._signature:
1170 ret = ret[: -len(self._signature)]
1171 return ret
1173 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]:
1174 """Extract the payload, signature, and signature type from this tag.
1176 Returns:
1177 tuple of (``payload``, ``signature``, ``signature_type``) where:
1179 - ``payload``: The raw tag data without the signature
1180 - ``signature``: The signature bytes if present, None otherwise
1181 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
1183 Raises:
1184 ObjectFormatException: If signature has unknown format
1185 """
1186 if self._signature is None:
1187 return self.as_raw_string(), None, None
1189 payload = self.raw_without_sig()
1191 # Determine signature type
1192 if self._signature.startswith(BEGIN_PGP_SIGNATURE):
1193 sig_type = SIGNATURE_PGP
1194 elif self._signature.startswith(BEGIN_SSH_SIGNATURE):
1195 sig_type = SIGNATURE_SSH
1196 else:
1197 raise ObjectFormatException("Unknown signature format")
1199 return payload, self._signature, sig_type
1201 def verify(self, keyids: Iterable[str] | None = None) -> None:
1202 """Verify GPG signature for this tag (if it is signed).
1204 Args:
1205 keyids: Optional iterable of trusted keyids for this tag.
1206 If this tag is not signed by any key in keyids verification will
1207 fail. If not specified, this function only verifies that the tag
1208 has a valid signature.
1210 Raises:
1211 gpg.errors.BadSignatures: if GPG signature verification fails
1212 gpg.errors.MissingSignatures: if tag was not signed by a key
1213 specified in keyids
1214 """
1215 if self._signature is None:
1216 return
1218 import gpg
1220 with gpg.Context() as ctx:
1221 data, result = ctx.verify(
1222 self.raw_without_sig(),
1223 signature=self._signature,
1224 )
1225 if keyids:
1226 keys = [ctx.get_key(key) for key in keyids]
1227 for key in keys:
1228 for subkey in key.subkeys:
1229 for sig in result.signatures:
1230 if subkey.can_sign and subkey.fpr == sig.fpr:
1231 return
1232 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1235class TreeEntry(NamedTuple):
1236 """Named tuple encapsulating a single tree entry."""
1238 path: bytes
1239 mode: int
1240 sha: ObjectID
1242 def in_path(self, path: bytes) -> "TreeEntry":
1243 """Return a copy of this entry with the given path prepended."""
1244 if not isinstance(self.path, bytes):
1245 raise TypeError(f"Expected bytes for path, got {path!r}")
1246 return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
1249def parse_tree(
1250 text: bytes, strict: bool = False
1251) -> Iterator[tuple[bytes, int, ObjectID]]:
1252 """Parse a tree text.
1254 Args:
1255 text: Serialized text to parse
1256 strict: If True, enforce strict validation
1257 Returns: iterator of tuples of (name, mode, sha)
1259 Raises:
1260 ObjectFormatException: if the object was malformed in some way
1261 """
1262 count = 0
1263 length = len(text)
1264 while count < length:
1265 mode_end = text.index(b" ", count)
1266 mode_text = text[count:mode_end]
1267 if strict and mode_text.startswith(b"0"):
1268 raise ObjectFormatException(f"Invalid mode {mode_text!r}")
1269 try:
1270 mode = int(mode_text, 8)
1271 except ValueError as exc:
1272 raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
1273 name_end = text.index(b"\0", mode_end)
1274 name = text[mode_end + 1 : name_end]
1275 count = name_end + 21
1276 sha = text[name_end + 1 : count]
1277 if len(sha) != 20:
1278 raise ObjectFormatException("Sha has invalid length")
1279 hexsha = sha_to_hex(RawObjectID(sha))
1280 yield (name, mode, hexsha)
1283def serialize_tree(items: Iterable[tuple[bytes, int, ObjectID]]) -> Iterator[bytes]:
1284 """Serialize the items in a tree to a text.
1286 Args:
1287 items: Sorted iterable over (name, mode, sha) tuples
1288 Returns: Serialized tree text as chunks
1289 """
1290 for name, mode, hexsha in items:
1291 yield (
1292 (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha)
1293 )
1296def sorted_tree_items(
1297 entries: dict[bytes, tuple[int, ObjectID]], name_order: bool
1298) -> Iterator[TreeEntry]:
1299 """Iterate over a tree entries dictionary.
1301 Args:
1302 name_order: If True, iterate entries in order of their name. If
1303 False, iterate entries in tree order, that is, treat subtree entries as
1304 having '/' appended.
1305 entries: Dictionary mapping names to (mode, sha) tuples
1306 Returns: Iterator over (name, mode, hexsha)
1307 """
1308 if name_order:
1309 key_func = key_entry_name_order
1310 else:
1311 key_func = key_entry
1312 for name, entry in sorted(entries.items(), key=key_func):
1313 mode, hexsha = entry
1314 # Stricter type checks than normal to mirror checks in the Rust version.
1315 mode = int(mode)
1316 if not isinstance(hexsha, bytes):
1317 raise TypeError(f"Expected bytes for SHA, got {hexsha!r}")
1318 yield TreeEntry(name, mode, hexsha)
1321def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1322 """Sort key for tree entry.
1324 Args:
1325 entry: (name, value) tuple
1326 """
1327 (name, (mode, _sha)) = entry
1328 if stat.S_ISDIR(mode):
1329 name += b"/"
1330 return name
1333def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
1334 """Sort key for tree entry in name order."""
1335 return entry[0]
1338def pretty_format_tree_entry(
1339 name: bytes, mode: int, hexsha: ObjectID, encoding: str = "utf-8"
1340) -> str:
1341 """Pretty format tree entry.
1343 Args:
1344 name: Name of the directory entry
1345 mode: Mode of entry
1346 hexsha: Hexsha of the referenced object
1347 encoding: Character encoding for the name
1348 Returns: string describing the tree entry
1349 """
1350 if mode & stat.S_IFDIR:
1351 kind = "tree"
1352 else:
1353 kind = "blob"
1354 return "{:04o} {} {}\t{}\n".format(
1355 mode,
1356 kind,
1357 hexsha.decode("ascii"),
1358 name.decode(encoding, "replace"),
1359 )
1362class SubmoduleEncountered(Exception):
1363 """A submodule was encountered while resolving a path."""
1365 def __init__(self, path: bytes, sha: ObjectID) -> None:
1366 """Initialize SubmoduleEncountered exception.
1368 Args:
1369 path: Path where the submodule was encountered
1370 sha: SHA of the submodule
1371 """
1372 self.path = path
1373 self.sha = sha
1376class Tree(ShaFile):
1377 """A Git tree object."""
1379 type_name = b"tree"
1380 type_num = 2
1382 __slots__ = "_entries"
1384 def __init__(self) -> None:
1385 """Initialize an empty Tree."""
1386 super().__init__()
1387 self._entries: dict[bytes, tuple[int, ObjectID]] = {}
1389 @classmethod
1390 def from_path(cls, filename: str | bytes) -> "Tree":
1391 """Read a tree from a file on disk.
1393 Args:
1394 filename: Path to the tree file
1396 Returns:
1397 A Tree object
1399 Raises:
1400 NotTreeError: If the file is not a tree
1401 """
1402 tree = ShaFile.from_path(filename)
1403 if not isinstance(tree, cls):
1404 raise NotTreeError(_path_to_bytes(filename))
1405 return tree
1407 def __contains__(self, name: bytes) -> bool:
1408 """Check if name exists in tree."""
1409 return name in self._entries
1411 def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
1412 """Get tree entry by name."""
1413 return self._entries[name]
1415 def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
1416 """Set a tree entry by name.
1418 Args:
1419 name: The name of the entry, as a string.
1420 value: A tuple of (mode, hexsha), where mode is the mode of the
1421 entry as an integral type and hexsha is the hex SHA of the entry as
1422 a string.
1423 """
1424 mode, hexsha = value
1425 self._entries[name] = (mode, hexsha)
1426 self._needs_serialization = True
1428 def __delitem__(self, name: bytes) -> None:
1429 """Delete tree entry by name."""
1430 del self._entries[name]
1431 self._needs_serialization = True
1433 def __len__(self) -> int:
1434 """Return number of entries in tree."""
1435 return len(self._entries)
1437 def __iter__(self) -> Iterator[bytes]:
1438 """Iterate over tree entry names."""
1439 return iter(self._entries)
1441 def add(self, name: bytes, mode: int, hexsha: ObjectID) -> None:
1442 """Add an entry to the tree.
1444 Args:
1445 mode: The mode of the entry as an integral type. Not all
1446 possible modes are supported by git; see check() for details.
1447 name: The name of the entry, as a string.
1448 hexsha: The hex SHA of the entry as a string.
1449 """
1450 self._entries[name] = mode, hexsha
1451 self._needs_serialization = True
1453 def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
1454 """Iterate over entries.
1456 Args:
1457 name_order: If True, iterate in name order instead of tree
1458 order.
1459 Returns: Iterator over (name, mode, sha) tuples
1460 """
1461 return sorted_tree_items(self._entries, name_order)
1463 def items(self) -> list[TreeEntry]:
1464 """Return the sorted entries in this tree.
1466 Returns: List with (name, mode, sha) tuples
1467 """
1468 return list(self.iteritems())
1470 def _deserialize(self, chunks: list[bytes]) -> None:
1471 """Grab the entries in the tree."""
1472 try:
1473 parsed_entries = parse_tree(b"".join(chunks))
1474 except ValueError as exc:
1475 raise ObjectFormatException(exc) from exc
1476 # TODO: list comprehension is for efficiency in the common (small)
1477 # case; if memory efficiency in the large case is a concern, use a
1478 # genexp.
1479 self._entries = {n: (m, s) for n, m, s in parsed_entries}
1481 def check(self) -> None:
1482 """Check this object for internal consistency.
1484 Raises:
1485 ObjectFormatException: if the object is malformed in some way
1486 """
1487 super().check()
1488 assert self._chunked_text is not None
1489 last = None
1490 allowed_modes = (
1491 stat.S_IFREG | 0o755,
1492 stat.S_IFREG | 0o644,
1493 stat.S_IFLNK,
1494 stat.S_IFDIR,
1495 S_IFGITLINK,
1496 # TODO: optionally exclude as in git fsck --strict
1497 stat.S_IFREG | 0o664,
1498 )
1499 for name, mode, sha in parse_tree(b"".join(self._chunked_text), True):
1500 check_hexsha(sha, f"invalid sha {sha!r}")
1501 if b"/" in name or name in (b"", b".", b"..", b".git"):
1502 raise ObjectFormatException(
1503 "invalid name {}".format(name.decode("utf-8", "replace"))
1504 )
1506 if mode not in allowed_modes:
1507 raise ObjectFormatException(f"invalid mode {mode:06o}")
1509 entry = (name, (mode, sha))
1510 if last:
1511 if key_entry(last) > key_entry(entry):
1512 raise ObjectFormatException("entries not sorted")
1513 if name == last[0]:
1514 raise ObjectFormatException(f"duplicate entry {name!r}")
1515 last = entry
1517 def _serialize(self) -> list[bytes]:
1518 return list(serialize_tree(self.iteritems()))
1520 def as_pretty_string(self) -> str:
1521 """Return a human-readable string representation of this tree.
1523 Returns:
1524 Pretty-printed tree entries
1525 """
1526 text: list[str] = []
1527 for entry in self.iteritems():
1528 if (
1529 entry.path is not None
1530 and entry.mode is not None
1531 and entry.sha is not None
1532 ):
1533 text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha))
1534 return "".join(text)
1536 def lookup_path(
1537 self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
1538 ) -> tuple[int, ObjectID]:
1539 """Look up an object in a Git tree.
1541 Args:
1542 lookup_obj: Callback for retrieving object by SHA1
1543 path: Path to lookup
1544 Returns: A tuple of (mode, SHA) of the resulting path.
1545 """
1546 # Handle empty path - return the tree itself
1547 if not path:
1548 return stat.S_IFDIR, self.id
1550 parts = path.split(b"/")
1551 sha = self.id
1552 mode: int | None = None
1553 for i, p in enumerate(parts):
1554 if not p:
1555 continue
1556 if mode is not None and S_ISGITLINK(mode):
1557 raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
1558 obj = lookup_obj(sha)
1559 if not isinstance(obj, Tree):
1560 raise NotTreeError(sha)
1561 mode, sha = obj[p]
1562 if mode is None:
1563 raise ValueError("No valid path found")
1564 return mode, sha
1567def parse_timezone(text: bytes) -> tuple[int, bool]:
1568 """Parse a timezone text fragment (e.g. '+0100').
1570 Args:
1571 text: Text to parse.
1572 Returns: Tuple with timezone as seconds difference to UTC
1573 and a boolean indicating whether this was a UTC timezone
1574 prefixed with a negative sign (-0000).
1575 """
1576 # cgit parses the first character as the sign, and the rest
1577 # as an integer (using strtol), which could also be negative.
1578 # We do the same for compatibility. See #697828.
1579 if text[0] not in b"+-":
1580 raise ValueError("Timezone must start with + or - ({text})".format(**vars()))
1581 sign = text[:1]
1582 offset = int(text[1:])
1583 if sign == b"-":
1584 offset = -offset
1585 unnecessary_negative_timezone = offset >= 0 and sign == b"-"
1586 signum = ((offset < 0) and -1) or 1
1587 offset = abs(offset)
1588 hours = int(offset / 100)
1589 minutes = offset % 100
1590 return (
1591 signum * (hours * 3600 + minutes * 60),
1592 unnecessary_negative_timezone,
1593 )
1596def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
1597 """Format a timezone for Git serialization.
1599 Args:
1600 offset: Timezone offset as seconds difference to UTC
1601 unnecessary_negative_timezone: Whether to use a minus sign for
1602 UTC or positive timezones (-0000 and --700 rather than +0000 / +0700).
1603 """
1604 if offset % 60 != 0:
1605 raise ValueError("Unable to handle non-minute offset.")
1606 if offset < 0 or unnecessary_negative_timezone:
1607 sign = "-"
1608 offset = -offset
1609 else:
1610 sign = "+"
1611 return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
1614def parse_time_entry(
1615 value: bytes,
1616) -> tuple[bytes, int | None, tuple[int | None, bool]]:
1617 """Parse event.
1619 Args:
1620 value: Bytes representing a git commit/tag line
1621 Raises:
1622 ObjectFormatException in case of parsing error (malformed
1623 field date)
1624 Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
1625 """
1626 try:
1627 sep = value.rindex(b"> ")
1628 except ValueError:
1629 return (value, None, (None, False))
1630 try:
1631 person = value[0 : sep + 1]
1632 rest = value[sep + 2 :]
1633 timetext, timezonetext = rest.rsplit(b" ", 1)
1634 time = int(timetext)
1635 timezone, timezone_neg_utc = parse_timezone(timezonetext)
1636 except ValueError as exc:
1637 raise ObjectFormatException(exc) from exc
1638 return person, time, (timezone, timezone_neg_utc)
1641def format_time_entry(
1642 person: bytes, time: int, timezone_info: tuple[int, bool]
1643) -> bytes:
1644 """Format an event."""
1645 (timezone, timezone_neg_utc) = timezone_info
1646 return b" ".join(
1647 [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
1648 )
1651@replace_me(since="0.21.0", remove_in="0.24.0")
1652def parse_commit(
1653 chunks: Iterable[bytes],
1654) -> tuple[
1655 bytes | None,
1656 list[bytes],
1657 tuple[bytes | None, int | None, tuple[int | None, bool | None]],
1658 tuple[bytes | None, int | None, tuple[int | None, bool | None]],
1659 bytes | None,
1660 list[Tag],
1661 bytes | None,
1662 bytes | None,
1663 list[tuple[bytes, bytes]],
1664]:
1665 """Parse a commit object from chunks.
1667 Args:
1668 chunks: Chunks to parse
1669 Returns: Tuple of (tree, parents, author_info, commit_info,
1670 encoding, mergetag, gpgsig, message, extra)
1671 """
1672 parents = []
1673 extra = []
1674 tree = None
1675 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1676 None,
1677 None,
1678 (None, None),
1679 )
1680 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1681 None,
1682 None,
1683 (None, None),
1684 )
1685 encoding = None
1686 mergetag = []
1687 message = None
1688 gpgsig = None
1690 for field, value in _parse_message(chunks):
1691 # TODO(jelmer): Enforce ordering
1692 if field == _TREE_HEADER:
1693 tree = value
1694 elif field == _PARENT_HEADER:
1695 if value is None:
1696 raise ObjectFormatException("missing parent value")
1697 parents.append(value)
1698 elif field == _AUTHOR_HEADER:
1699 if value is None:
1700 raise ObjectFormatException("missing author value")
1701 author_info = parse_time_entry(value)
1702 elif field == _COMMITTER_HEADER:
1703 if value is None:
1704 raise ObjectFormatException("missing committer value")
1705 commit_info = parse_time_entry(value)
1706 elif field == _ENCODING_HEADER:
1707 encoding = value
1708 elif field == _MERGETAG_HEADER:
1709 if value is None:
1710 raise ObjectFormatException("missing mergetag value")
1711 tag = Tag.from_string(value + b"\n")
1712 assert isinstance(tag, Tag)
1713 mergetag.append(tag)
1714 elif field == _GPGSIG_HEADER:
1715 gpgsig = value
1716 elif field is None:
1717 message = value
1718 else:
1719 if value is None:
1720 raise ObjectFormatException(f"missing value for field {field!r}")
1721 extra.append((field, value))
1722 return (
1723 tree,
1724 parents,
1725 author_info,
1726 commit_info,
1727 encoding,
1728 mergetag,
1729 gpgsig,
1730 message,
1731 extra,
1732 )
1735class Commit(ShaFile):
1736 """A git commit object."""
1738 type_name = b"commit"
1739 type_num = 1
1741 __slots__ = (
1742 "_author",
1743 "_author_time",
1744 "_author_timezone",
1745 "_author_timezone_neg_utc",
1746 "_commit_time",
1747 "_commit_timezone",
1748 "_commit_timezone_neg_utc",
1749 "_committer",
1750 "_encoding",
1751 "_extra",
1752 "_gpgsig",
1753 "_mergetag",
1754 "_message",
1755 "_parents",
1756 "_tree",
1757 )
1759 def __init__(self) -> None:
1760 """Initialize an empty Commit."""
1761 super().__init__()
1762 self._parents: list[ObjectID] = []
1763 self._encoding: bytes | None = None
1764 self._mergetag: list[Tag] = []
1765 self._gpgsig: bytes | None = None
1766 self._extra: list[tuple[bytes, bytes | None]] = []
1767 self._author_timezone_neg_utc: bool | None = False
1768 self._commit_timezone_neg_utc: bool | None = False
1770 @classmethod
1771 def from_path(cls, path: str | bytes) -> "Commit":
1772 """Read a commit from a file on disk.
1774 Args:
1775 path: Path to the commit file
1777 Returns:
1778 A Commit object
1780 Raises:
1781 NotCommitError: If the file is not a commit
1782 """
1783 commit = ShaFile.from_path(path)
1784 if not isinstance(commit, cls):
1785 raise NotCommitError(_path_to_bytes(path))
1786 return commit
1788 def _deserialize(self, chunks: list[bytes]) -> None:
1789 self._parents = []
1790 self._extra = []
1791 self._tree = None
1792 author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1793 None,
1794 None,
1795 (None, None),
1796 )
1797 commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
1798 None,
1799 None,
1800 (None, None),
1801 )
1802 self._encoding = None
1803 self._mergetag = []
1804 self._message = None
1805 self._gpgsig = None
1807 for field, value in _parse_message(chunks):
1808 # TODO(jelmer): Enforce ordering
1809 if field == _TREE_HEADER:
1810 self._tree = value
1811 elif field == _PARENT_HEADER:
1812 assert value is not None
1813 self._parents.append(ObjectID(value))
1814 elif field == _AUTHOR_HEADER:
1815 if value is None:
1816 raise ObjectFormatException("missing author value")
1817 author_info = parse_time_entry(value)
1818 elif field == _COMMITTER_HEADER:
1819 if value is None:
1820 raise ObjectFormatException("missing committer value")
1821 commit_info = parse_time_entry(value)
1822 elif field == _ENCODING_HEADER:
1823 self._encoding = value
1824 elif field == _MERGETAG_HEADER:
1825 assert value is not None
1826 tag = Tag.from_string(value + b"\n")
1827 assert isinstance(tag, Tag)
1828 self._mergetag.append(tag)
1829 elif field == _GPGSIG_HEADER:
1830 self._gpgsig = value
1831 elif field is None:
1832 self._message = value
1833 else:
1834 self._extra.append((field, value))
1836 (
1837 self._author,
1838 self._author_time,
1839 (self._author_timezone, self._author_timezone_neg_utc),
1840 ) = author_info
1841 (
1842 self._committer,
1843 self._commit_time,
1844 (self._commit_timezone, self._commit_timezone_neg_utc),
1845 ) = commit_info
1847 def check(self) -> None:
1848 """Check this object for internal consistency.
1850 Raises:
1851 ObjectFormatException: if the object is malformed in some way
1852 """
1853 super().check()
1854 assert self._chunked_text is not None
1855 self._check_has_member("_tree", "missing tree")
1856 self._check_has_member("_author", "missing author")
1857 self._check_has_member("_committer", "missing committer")
1858 self._check_has_member("_author_time", "missing author time")
1859 self._check_has_member("_commit_time", "missing commit time")
1861 for parent in self._parents:
1862 check_hexsha(parent, "invalid parent sha")
1863 assert self._tree is not None # checked by _check_has_member above
1864 check_hexsha(self._tree, "invalid tree sha")
1866 assert self._author is not None # checked by _check_has_member above
1867 assert self._committer is not None # checked by _check_has_member above
1868 check_identity(self._author, "invalid author")
1869 check_identity(self._committer, "invalid committer")
1871 assert self._author_time is not None # checked by _check_has_member above
1872 assert self._commit_time is not None # checked by _check_has_member above
1873 check_time(self._author_time)
1874 check_time(self._commit_time)
1876 last = None
1877 for field, _ in _parse_message(self._chunked_text):
1878 if field == _TREE_HEADER and last is not None:
1879 raise ObjectFormatException("unexpected tree")
1880 elif field == _PARENT_HEADER and last not in (
1881 _PARENT_HEADER,
1882 _TREE_HEADER,
1883 ):
1884 raise ObjectFormatException("unexpected parent")
1885 elif field == _AUTHOR_HEADER and last not in (
1886 _TREE_HEADER,
1887 _PARENT_HEADER,
1888 ):
1889 raise ObjectFormatException("unexpected author")
1890 elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER:
1891 raise ObjectFormatException("unexpected committer")
1892 elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER:
1893 raise ObjectFormatException("unexpected encoding")
1894 last = field
1896 # TODO: optionally check for duplicate parents
1898 def sign(self, keyid: str | None = None) -> None:
1899 """Sign this commit with a GPG key.
1901 Args:
1902 keyid: Optional GPG key ID to use for signing. If not specified,
1903 the default GPG key will be used.
1904 """
1905 import gpg
1907 with gpg.Context(armor=True) as c:
1908 if keyid is not None:
1909 key = c.get_key(keyid)
1910 with gpg.Context(armor=True, signers=[key]) as ctx:
1911 self.gpgsig, _unused_result = ctx.sign(
1912 self.as_raw_string(),
1913 mode=gpg.constants.sig.mode.DETACH,
1914 )
1915 else:
1916 self.gpgsig, _unused_result = c.sign(
1917 self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
1918 )
1920 def raw_without_sig(self) -> bytes:
1921 """Return raw string serialization without the GPG/SSH signature.
1923 self.gpgsig is a signature for the returned raw byte string serialization.
1924 """
1925 tmp = self.copy()
1926 assert isinstance(tmp, Commit)
1927 tmp._gpgsig = None
1928 tmp.gpgsig = None
1929 return tmp.as_raw_string()
1931 def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]:
1932 """Extract the payload, signature, and signature type from this commit.
1934 Returns:
1935 tuple of (``payload``, ``signature``, ``signature_type``) where:
1937 - ``payload``: The raw commit data without the signature
1938 - ``signature``: The signature bytes if present, None otherwise
1939 - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
1941 Raises:
1942 ObjectFormatException: If signature has unknown format
1943 """
1944 if self._gpgsig is None:
1945 return self.as_raw_string(), None, None
1947 payload = self.raw_without_sig()
1949 # Determine signature type
1950 if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE):
1951 sig_type = SIGNATURE_PGP
1952 elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE):
1953 sig_type = SIGNATURE_SSH
1954 else:
1955 raise ObjectFormatException("Unknown signature format")
1957 return payload, self._gpgsig, sig_type
1959 def verify(self, keyids: Iterable[str] | None = None) -> None:
1960 """Verify GPG signature for this commit (if it is signed).
1962 Args:
1963 keyids: Optional iterable of trusted keyids for this commit.
1964 If this commit is not signed by any key in keyids verification will
1965 fail. If not specified, this function only verifies that the commit
1966 has a valid signature.
1968 Raises:
1969 gpg.errors.BadSignatures: if GPG signature verification fails
1970 gpg.errors.MissingSignatures: if commit was not signed by a key
1971 specified in keyids
1972 """
1973 if self._gpgsig is None:
1974 return
1976 import gpg
1978 with gpg.Context() as ctx:
1979 data, result = ctx.verify(
1980 self.raw_without_sig(),
1981 signature=self._gpgsig,
1982 )
1983 if keyids:
1984 keys = [ctx.get_key(key) for key in keyids]
1985 for key in keys:
1986 for subkey in key.subkeys:
1987 for sig in result.signatures:
1988 if subkey.can_sign and subkey.fpr == sig.fpr:
1989 return
1990 raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
1992 def _serialize(self) -> list[bytes]:
1993 headers = []
1994 assert self._tree is not None
1995 tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
1996 headers.append((_TREE_HEADER, tree_bytes))
1997 for p in self._parents:
1998 headers.append((_PARENT_HEADER, p))
1999 assert self._author is not None
2000 assert self._author_time is not None
2001 assert self._author_timezone is not None
2002 assert self._author_timezone_neg_utc is not None
2003 headers.append(
2004 (
2005 _AUTHOR_HEADER,
2006 format_time_entry(
2007 self._author,
2008 self._author_time,
2009 (self._author_timezone, self._author_timezone_neg_utc),
2010 ),
2011 )
2012 )
2013 assert self._committer is not None
2014 assert self._commit_time is not None
2015 assert self._commit_timezone is not None
2016 assert self._commit_timezone_neg_utc is not None
2017 headers.append(
2018 (
2019 _COMMITTER_HEADER,
2020 format_time_entry(
2021 self._committer,
2022 self._commit_time,
2023 (self._commit_timezone, self._commit_timezone_neg_utc),
2024 ),
2025 )
2026 )
2027 if self.encoding:
2028 headers.append((_ENCODING_HEADER, self.encoding))
2029 for mergetag in self.mergetag:
2030 headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
2031 headers.extend(
2032 (field, value) for field, value in self._extra if value is not None
2033 )
2034 if self.gpgsig:
2035 headers.append((_GPGSIG_HEADER, self.gpgsig))
2036 return list(_format_message(headers, self._message))
2038 tree = serializable_property("tree", "Tree that is the state of this commit")
2040 def _get_parents(self) -> list[ObjectID]:
2041 """Return a list of parents of this commit."""
2042 return self._parents
2044 def _set_parents(self, value: list[ObjectID]) -> None:
2045 """Set a list of parents of this commit."""
2046 self._needs_serialization = True
2047 self._parents = value
2049 parents = property(
2050 _get_parents,
2051 _set_parents,
2052 doc="Parents of this commit, by their SHA1.",
2053 )
2055 @replace_me(since="0.21.0", remove_in="0.24.0")
2056 def _get_extra(self) -> list[tuple[bytes, bytes | None]]:
2057 """Return extra settings of this commit."""
2058 return self._extra
2060 extra = property(
2061 _get_extra,
2062 doc="Extra header fields not understood (presumably added in a "
2063 "newer version of git). Kept verbatim so the object can "
2064 "be correctly reserialized. For private commit metadata, use "
2065 "pseudo-headers in Commit.message, rather than this field.",
2066 )
2068 author = serializable_property("author", "The name of the author of the commit")
2070 committer = serializable_property(
2071 "committer", "The name of the committer of the commit"
2072 )
2074 message = serializable_property("message", "The commit message")
2076 commit_time = serializable_property(
2077 "commit_time",
2078 "The timestamp of the commit. As the number of seconds since the epoch.",
2079 )
2081 commit_timezone = serializable_property(
2082 "commit_timezone", "The zone the commit time is in"
2083 )
2085 author_time = serializable_property(
2086 "author_time",
2087 "The timestamp the commit was written. As the number of "
2088 "seconds since the epoch.",
2089 )
2091 author_timezone = serializable_property(
2092 "author_timezone", "Returns the zone the author time is in."
2093 )
2095 encoding = serializable_property("encoding", "Encoding of the commit message.")
2097 mergetag = serializable_property("mergetag", "Associated signed tag.")
2099 gpgsig = serializable_property("gpgsig", "GPG Signature.")
2102OBJECT_CLASSES = (
2103 Commit,
2104 Tree,
2105 Blob,
2106 Tag,
2107)
2109_TYPE_MAP: dict[bytes | int, type[ShaFile]] = {}
2111for cls in OBJECT_CLASSES:
2112 _TYPE_MAP[cls.type_name] = cls
2113 _TYPE_MAP[cls.type_num] = cls
2116# Hold on to the pure-python implementations for testing
2117_parse_tree_py = parse_tree
2118_sorted_tree_items_py = sorted_tree_items
2119try:
2120 # Try to import Rust versions
2121 from dulwich._objects import (
2122 parse_tree as _parse_tree_rs,
2123 )
2124 from dulwich._objects import (
2125 sorted_tree_items as _sorted_tree_items_rs,
2126 )
2127except ImportError:
2128 pass
2129else:
2130 parse_tree = _parse_tree_rs
2131 sorted_tree_items = _sorted_tree_items_rs