1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
2#
3# This module is part of GitPython and is released under the
4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
5
6__all__ = ["Commit"]
7
8from collections import defaultdict
9import datetime
10from io import BytesIO
11import logging
12import os
13import re
14from subprocess import Popen, PIPE
15import sys
16from time import altzone, daylight, localtime, time, timezone
17import warnings
18
19from gitdb import IStream
20
21from git.cmd import Git
22from git.diff import Diffable
23from git.util import Actor, Stats, finalize_process, hex_to_bin
24
25from . import base
26from .tree import Tree
27from .util import (
28 Serializable,
29 TraversableIterableObj,
30 altz_to_utctz_str,
31 from_timestamp,
32 parse_actor_and_date,
33 parse_date,
34)
35
36# typing ------------------------------------------------------------------
37
38from typing import (
39 Any,
40 Dict,
41 IO,
42 Iterator,
43 List,
44 Sequence,
45 Tuple,
46 TYPE_CHECKING,
47 Union,
48 cast,
49)
50
51if sys.version_info >= (3, 8):
52 from typing import Literal
53else:
54 from typing_extensions import Literal
55
56from git.types import PathLike
57
58if TYPE_CHECKING:
59 from git.refs import SymbolicReference
60 from git.repo import Repo
61
62# ------------------------------------------------------------------------
63
64_logger = logging.getLogger(__name__)
65
66
67class Commit(base.Object, TraversableIterableObj, Diffable, Serializable):
68 """Wraps a git commit object.
69
70 See :manpage:`gitglossary(7)` on "commit object":
71 https://git-scm.com/docs/gitglossary#def_commit_object
72
73 :note:
74 This class will act lazily on some of its attributes and will query the value on
75 demand only if it involves calling the git binary.
76 """
77
78 # ENVIRONMENT VARIABLES
79 # Read when creating new commits.
80 env_author_date = "GIT_AUTHOR_DATE"
81 env_committer_date = "GIT_COMMITTER_DATE"
82
83 # CONFIGURATION KEYS
84 conf_encoding = "i18n.commitencoding"
85
86 # INVARIANTS
87 default_encoding = "UTF-8"
88
89 type: Literal["commit"] = "commit"
90
91 __slots__ = (
92 "tree",
93 "author",
94 "authored_date",
95 "author_tz_offset",
96 "committer",
97 "committed_date",
98 "committer_tz_offset",
99 "message",
100 "parents",
101 "encoding",
102 "gpgsig",
103 )
104
105 _id_attribute_ = "hexsha"
106
107 parents: Sequence["Commit"]
108
109 def __init__(
110 self,
111 repo: "Repo",
112 binsha: bytes,
113 tree: Union[Tree, None] = None,
114 author: Union[Actor, None] = None,
115 authored_date: Union[int, None] = None,
116 author_tz_offset: Union[None, float] = None,
117 committer: Union[Actor, None] = None,
118 committed_date: Union[int, None] = None,
119 committer_tz_offset: Union[None, float] = None,
120 message: Union[str, bytes, None] = None,
121 parents: Union[Sequence["Commit"], None] = None,
122 encoding: Union[str, None] = None,
123 gpgsig: Union[str, None] = None,
124 ) -> None:
125 """Instantiate a new :class:`Commit`. All keyword arguments taking ``None`` as
126 default will be implicitly set on first query.
127
128 :param binsha:
129 20 byte sha1.
130
131 :param tree:
132 A :class:`~git.objects.tree.Tree` object.
133
134 :param author:
135 The author :class:`~git.util.Actor` object.
136
137 :param authored_date: int_seconds_since_epoch
138 The authored DateTime - use :func:`time.gmtime` to convert it into a
139 different format.
140
141 :param author_tz_offset: int_seconds_west_of_utc
142 The timezone that the `authored_date` is in.
143
144 :param committer:
145 The committer string, as an :class:`~git.util.Actor` object.
146
147 :param committed_date: int_seconds_since_epoch
148 The committed DateTime - use :func:`time.gmtime` to convert it into a
149 different format.
150
151 :param committer_tz_offset: int_seconds_west_of_utc
152 The timezone that the `committed_date` is in.
153
154 :param message: string
155 The commit message.
156
157 :param encoding: string
158 Encoding of the message, defaults to UTF-8.
159
160 :param parents:
161 List or tuple of :class:`Commit` objects which are our parent(s) in the
162 commit dependency graph.
163
164 :return:
165 :class:`Commit`
166
167 :note:
168 Timezone information is in the same format and in the same sign as what
169 :func:`time.altzone` returns. The sign is inverted compared to git's UTC
170 timezone.
171 """
172 super().__init__(repo, binsha)
173 self.binsha = binsha
174 if tree is not None:
175 assert isinstance(tree, Tree), "Tree needs to be a Tree instance, was %s" % type(tree)
176 if tree is not None:
177 self.tree = tree
178 if author is not None:
179 self.author = author
180 if authored_date is not None:
181 self.authored_date = authored_date
182 if author_tz_offset is not None:
183 self.author_tz_offset = author_tz_offset
184 if committer is not None:
185 self.committer = committer
186 if committed_date is not None:
187 self.committed_date = committed_date
188 if committer_tz_offset is not None:
189 self.committer_tz_offset = committer_tz_offset
190 if message is not None:
191 self.message = message
192 if parents is not None:
193 self.parents = parents
194 if encoding is not None:
195 self.encoding = encoding
196 if gpgsig is not None:
197 self.gpgsig = gpgsig
198
199 @classmethod
200 def _get_intermediate_items(cls, commit: "Commit") -> Tuple["Commit", ...]:
201 return tuple(commit.parents)
202
203 @classmethod
204 def _calculate_sha_(cls, repo: "Repo", commit: "Commit") -> bytes:
205 """Calculate the sha of a commit.
206
207 :param repo:
208 :class:`~git.repo.base.Repo` object the commit should be part of.
209
210 :param commit:
211 :class:`Commit` object for which to generate the sha.
212 """
213
214 stream = BytesIO()
215 commit._serialize(stream)
216 streamlen = stream.tell()
217 stream.seek(0)
218
219 istream = repo.odb.store(IStream(cls.type, streamlen, stream))
220 return istream.binsha
221
222 def replace(self, **kwargs: Any) -> "Commit":
223 """Create new commit object from an existing commit object.
224
225 Any values provided as keyword arguments will replace the corresponding
226 attribute in the new object.
227 """
228
229 attrs = {k: getattr(self, k) for k in self.__slots__}
230
231 for attrname in kwargs:
232 if attrname not in self.__slots__:
233 raise ValueError("invalid attribute name")
234
235 attrs.update(kwargs)
236 new_commit = self.__class__(self.repo, self.NULL_BIN_SHA, **attrs)
237 new_commit.binsha = self._calculate_sha_(self.repo, new_commit)
238
239 return new_commit
240
241 def _set_cache_(self, attr: str) -> None:
242 if attr in Commit.__slots__:
243 # Read the data in a chunk, its faster - then provide a file wrapper.
244 _binsha, _typename, self.size, stream = self.repo.odb.stream(self.binsha)
245 self._deserialize(BytesIO(stream.read()))
246 else:
247 super()._set_cache_(attr)
248 # END handle attrs
249
250 @property
251 def authored_datetime(self) -> datetime.datetime:
252 return from_timestamp(self.authored_date, self.author_tz_offset)
253
254 @property
255 def committed_datetime(self) -> datetime.datetime:
256 return from_timestamp(self.committed_date, self.committer_tz_offset)
257
258 @property
259 def summary(self) -> Union[str, bytes]:
260 """:return: First line of the commit message"""
261 if isinstance(self.message, str):
262 return self.message.split("\n", 1)[0]
263 else:
264 return self.message.split(b"\n", 1)[0]
265
266 def count(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> int:
267 """Count the number of commits reachable from this commit.
268
269 :param paths:
270 An optional path or a list of paths restricting the return value to commits
271 actually containing the paths.
272
273 :param kwargs:
274 Additional options to be passed to :manpage:`git-rev-list(1)`. They must not
275 alter the output style of the command, or parsing will yield incorrect
276 results.
277
278 :return:
279 An int defining the number of reachable commits
280 """
281 # Yes, it makes a difference whether empty paths are given or not in our case as
282 # the empty paths version will ignore merge commits for some reason.
283 if paths:
284 return len(self.repo.git.rev_list(self.hexsha, "--", paths, **kwargs).splitlines())
285 return len(self.repo.git.rev_list(self.hexsha, **kwargs).splitlines())
286
287 @property
288 def name_rev(self) -> str:
289 """
290 :return:
291 String describing the commits hex sha based on the closest
292 :class:`~git.refs.reference.Reference`.
293
294 :note:
295 Mostly useful for UI purposes.
296 """
297 return self.repo.git.name_rev(self)
298
299 @classmethod
300 def iter_items(
301 cls,
302 repo: "Repo",
303 rev: Union[str, "Commit", "SymbolicReference"],
304 paths: Union[PathLike, Sequence[PathLike]] = "",
305 **kwargs: Any,
306 ) -> Iterator["Commit"]:
307 R"""Find all commits matching the given criteria.
308
309 :param repo:
310 The :class:`~git.repo.base.Repo`.
311
312 :param rev:
313 Revision specifier. See :manpage:`git-rev-parse(1)` for viable options.
314
315 :param paths:
316 An optional path or list of paths. If set only :class:`Commit`\s that
317 include the path or paths will be considered.
318
319 :param kwargs:
320 Optional keyword arguments to :manpage:`git-rev-list(1)` where:
321
322 * ``max_count`` is the maximum number of commits to fetch.
323 * ``skip`` is the number of commits to skip.
324 * ``since`` selects all commits since some date, e.g. ``"1970-01-01"``.
325
326 :return:
327 Iterator yielding :class:`Commit` items.
328 """
329 if "pretty" in kwargs:
330 raise ValueError("--pretty cannot be used as parsing expects single sha's only")
331 # END handle pretty
332
333 # Use -- in all cases, to prevent possibility of ambiguous arguments.
334 # See https://github.com/gitpython-developers/GitPython/issues/264.
335
336 args_list: List[PathLike] = ["--"]
337
338 if paths:
339 paths_tup: Tuple[PathLike, ...]
340 if isinstance(paths, (str, os.PathLike)):
341 paths_tup = (paths,)
342 else:
343 paths_tup = tuple(paths)
344
345 args_list.extend(paths_tup)
346 # END if paths
347
348 proc = repo.git.rev_list(rev, args_list, as_process=True, **kwargs)
349 return cls._iter_from_process_or_stream(repo, proc)
350
351 def iter_parents(self, paths: Union[PathLike, Sequence[PathLike]] = "", **kwargs: Any) -> Iterator["Commit"]:
352 R"""Iterate *all* parents of this commit.
353
354 :param paths:
355 Optional path or list of paths limiting the :class:`Commit`\s to those that
356 contain at least one of the paths.
357
358 :param kwargs:
359 All arguments allowed by :manpage:`git-rev-list(1)`.
360
361 :return:
362 Iterator yielding :class:`Commit` objects which are parents of ``self``
363 """
364 # skip ourselves
365 skip = kwargs.get("skip", 1)
366 if skip == 0: # skip ourselves
367 skip = 1
368 kwargs["skip"] = skip
369
370 return self.iter_items(self.repo, self, paths, **kwargs)
371
372 @property
373 def stats(self) -> Stats:
374 """Create a git stat from changes between this commit and its first parent
375 or from all changes done if this is the very first commit.
376
377 :return:
378 :class:`Stats`
379 """
380
381 def process_lines(lines: List[str]) -> str:
382 text = ""
383 for file_info, line in zip(lines, lines[len(lines) // 2 :]):
384 change_type = file_info.split("\t")[0][-1]
385 (insertions, deletions, filename) = line.split("\t")
386 text += "%s\t%s\t%s\t%s\n" % (change_type, insertions, deletions, filename)
387 return text
388
389 if not self.parents:
390 lines = self.repo.git.diff_tree(
391 self.hexsha, "--", numstat=True, no_renames=True, root=True, raw=True
392 ).splitlines()[1:]
393 text = process_lines(lines)
394 else:
395 lines = self.repo.git.diff(
396 self.parents[0].hexsha, self.hexsha, "--", numstat=True, no_renames=True, raw=True
397 ).splitlines()
398 text = process_lines(lines)
399 return Stats._list_from_string(self.repo, text)
400
401 @property
402 def trailers(self) -> Dict[str, str]:
403 """Deprecated. Get the trailers of the message as a dictionary.
404
405 :note:
406 This property is deprecated, please use either :attr:`trailers_list` or
407 :attr:`trailers_dict`.
408
409 :return:
410 Dictionary containing whitespace stripped trailer information.
411 Only contains the latest instance of each trailer key.
412 """
413 warnings.warn(
414 "Commit.trailers is deprecated, use Commit.trailers_list or Commit.trailers_dict instead",
415 DeprecationWarning,
416 stacklevel=2,
417 )
418 return {k: v[0] for k, v in self.trailers_dict.items()}
419
420 @property
421 def trailers_list(self) -> List[Tuple[str, str]]:
422 """Get the trailers of the message as a list.
423
424 Git messages can contain trailer information that are similar to :rfc:`822`
425 e-mail headers. See :manpage:`git-interpret-trailers(1)`.
426
427 This function calls ``git interpret-trailers --parse`` onto the message to
428 extract the trailer information, returns the raw trailer data as a list.
429
430 Valid message with trailer::
431
432 Subject line
433
434 some body information
435
436 another information
437
438 key1: value1.1
439 key1: value1.2
440 key2 : value 2 with inner spaces
441
442 Returned list will look like this::
443
444 [
445 ("key1", "value1.1"),
446 ("key1", "value1.2"),
447 ("key2", "value 2 with inner spaces"),
448 ]
449
450 :return:
451 List containing key-value tuples of whitespace stripped trailer information.
452 """
453 cmd = ["git", "interpret-trailers", "--parse"]
454 proc: Git.AutoInterrupt = self.repo.git.execute( # type: ignore[call-overload]
455 cmd,
456 as_process=True,
457 istream=PIPE,
458 )
459 trailer: str = proc.communicate(str(self.message).encode())[0].decode("utf8")
460 trailer = trailer.strip()
461
462 if not trailer:
463 return []
464
465 trailer_list = []
466 for t in trailer.split("\n"):
467 key, val = t.split(":", 1)
468 trailer_list.append((key.strip(), val.strip()))
469
470 return trailer_list
471
472 @property
473 def trailers_dict(self) -> Dict[str, List[str]]:
474 """Get the trailers of the message as a dictionary.
475
476 Git messages can contain trailer information that are similar to :rfc:`822`
477 e-mail headers. See :manpage:`git-interpret-trailers(1)`.
478
479 This function calls ``git interpret-trailers --parse`` onto the message to
480 extract the trailer information. The key value pairs are stripped of leading and
481 trailing whitespaces before they get saved into a dictionary.
482
483 Valid message with trailer::
484
485 Subject line
486
487 some body information
488
489 another information
490
491 key1: value1.1
492 key1: value1.2
493 key2 : value 2 with inner spaces
494
495 Returned dictionary will look like this::
496
497 {
498 "key1": ["value1.1", "value1.2"],
499 "key2": ["value 2 with inner spaces"],
500 }
501
502
503 :return:
504 Dictionary containing whitespace stripped trailer information, mapping
505 trailer keys to a list of their corresponding values.
506 """
507 d = defaultdict(list)
508 for key, val in self.trailers_list:
509 d[key].append(val)
510 return dict(d)
511
512 @classmethod
513 def _iter_from_process_or_stream(cls, repo: "Repo", proc_or_stream: Union[Popen, IO]) -> Iterator["Commit"]:
514 """Parse out commit information into a list of :class:`Commit` objects.
515
516 We expect one line per commit, and parse the actual commit information directly
517 from our lighting fast object database.
518
519 :param proc:
520 :manpage:`git-rev-list(1)` process instance - one sha per line.
521
522 :return:
523 Iterator supplying :class:`Commit` objects
524 """
525
526 # def is_proc(inp) -> TypeGuard[Popen]:
527 # return hasattr(proc_or_stream, 'wait') and not hasattr(proc_or_stream, 'readline')
528
529 # def is_stream(inp) -> TypeGuard[IO]:
530 # return hasattr(proc_or_stream, 'readline')
531
532 if hasattr(proc_or_stream, "wait"):
533 proc_or_stream = cast(Popen, proc_or_stream)
534 if proc_or_stream.stdout is not None:
535 stream = proc_or_stream.stdout
536 elif hasattr(proc_or_stream, "readline"):
537 proc_or_stream = cast(IO, proc_or_stream) # type: ignore[redundant-cast]
538 stream = proc_or_stream
539
540 readline = stream.readline
541 while True:
542 line = readline()
543 if not line:
544 break
545 hexsha = line.strip()
546 if len(hexsha) > 40:
547 # Split additional information, as returned by bisect for instance.
548 hexsha, _ = line.split(None, 1)
549 # END handle extra info
550
551 assert len(hexsha) == 40, "Invalid line: %s" % hexsha
552 yield cls(repo, hex_to_bin(hexsha))
553 # END for each line in stream
554
555 # TODO: Review this - it seems process handling got a bit out of control due to
556 # many developers trying to fix the open file handles issue.
557 if hasattr(proc_or_stream, "wait"):
558 proc_or_stream = cast(Popen, proc_or_stream)
559 finalize_process(proc_or_stream)
560
561 @classmethod
562 def create_from_tree(
563 cls,
564 repo: "Repo",
565 tree: Union[Tree, str],
566 message: str,
567 parent_commits: Union[None, List["Commit"]] = None,
568 head: bool = False,
569 author: Union[None, Actor] = None,
570 committer: Union[None, Actor] = None,
571 author_date: Union[None, str, datetime.datetime] = None,
572 commit_date: Union[None, str, datetime.datetime] = None,
573 ) -> "Commit":
574 """Commit the given tree, creating a :class:`Commit` object.
575
576 :param repo:
577 :class:`~git.repo.base.Repo` object the commit should be part of.
578
579 :param tree:
580 :class:`~git.objects.tree.Tree` object or hex or bin sha.
581 The tree of the new commit.
582
583 :param message:
584 Commit message. It may be an empty string if no message is provided. It will
585 be converted to a string, in any case.
586
587 :param parent_commits:
588 Optional :class:`Commit` objects to use as parents for the new commit. If
589 empty list, the commit will have no parents at all and become a root commit.
590 If ``None``, the current head commit will be the parent of the new commit
591 object.
592
593 :param head:
594 If ``True``, the HEAD will be advanced to the new commit automatically.
595 Otherwise the HEAD will remain pointing on the previous commit. This could
596 lead to undesired results when diffing files.
597
598 :param author:
599 The name of the author, optional.
600 If unset, the repository configuration is used to obtain this value.
601
602 :param committer:
603 The name of the committer, optional.
604 If unset, the repository configuration is used to obtain this value.
605
606 :param author_date:
607 The timestamp for the author field.
608
609 :param commit_date:
610 The timestamp for the committer field.
611
612 :return:
613 :class:`Commit` object representing the new commit.
614
615 :note:
616 Additional information about the committer and author are taken from the
617 environment or from the git configuration. See :manpage:`git-commit-tree(1)`
618 for more information.
619 """
620 if parent_commits is None:
621 try:
622 parent_commits = [repo.head.commit]
623 except ValueError:
624 # Empty repositories have no head commit.
625 parent_commits = []
626 # END handle parent commits
627 else:
628 for p in parent_commits:
629 if not isinstance(p, cls):
630 raise ValueError(f"Parent commit '{p!r}' must be of type {cls}")
631 # END check parent commit types
632 # END if parent commits are unset
633
634 # Retrieve all additional information, create a commit object, and serialize it.
635 # Generally:
636 # * Environment variables override configuration values.
637 # * Sensible defaults are set according to the git documentation.
638
639 # COMMITTER AND AUTHOR INFO
640 cr = repo.config_reader()
641 env = os.environ
642
643 committer = committer or Actor.committer(cr)
644 author = author or Actor.author(cr)
645
646 # PARSE THE DATES
647 unix_time = int(time())
648 is_dst = daylight and localtime().tm_isdst > 0
649 offset = altzone if is_dst else timezone
650
651 author_date_str = env.get(cls.env_author_date, "")
652 if author_date:
653 author_time, author_offset = parse_date(author_date)
654 elif author_date_str:
655 author_time, author_offset = parse_date(author_date_str)
656 else:
657 author_time, author_offset = unix_time, offset
658 # END set author time
659
660 committer_date_str = env.get(cls.env_committer_date, "")
661 if commit_date:
662 committer_time, committer_offset = parse_date(commit_date)
663 elif committer_date_str:
664 committer_time, committer_offset = parse_date(committer_date_str)
665 else:
666 committer_time, committer_offset = unix_time, offset
667 # END set committer time
668
669 # Assume UTF-8 encoding.
670 enc_section, enc_option = cls.conf_encoding.split(".")
671 conf_encoding = cr.get_value(enc_section, enc_option, cls.default_encoding)
672 if not isinstance(conf_encoding, str):
673 raise TypeError("conf_encoding could not be coerced to str")
674
675 # If the tree is no object, make sure we create one - otherwise the created
676 # commit object is invalid.
677 if isinstance(tree, str):
678 tree = repo.tree(tree)
679 # END tree conversion
680
681 # CREATE NEW COMMIT
682 new_commit = cls(
683 repo,
684 cls.NULL_BIN_SHA,
685 tree,
686 author,
687 author_time,
688 author_offset,
689 committer,
690 committer_time,
691 committer_offset,
692 message,
693 parent_commits,
694 conf_encoding,
695 )
696
697 new_commit.binsha = cls._calculate_sha_(repo, new_commit)
698
699 if head:
700 # Need late import here, importing git at the very beginning throws as
701 # well...
702 import git.refs
703
704 try:
705 repo.head.set_commit(new_commit, logmsg=message)
706 except ValueError:
707 # head is not yet set to the ref our HEAD points to.
708 # Happens on first commit.
709 master = git.refs.Head.create(
710 repo,
711 repo.head.ref,
712 new_commit,
713 logmsg="commit (initial): %s" % message,
714 )
715 repo.head.set_reference(master, logmsg="commit: Switching to %s" % master)
716 # END handle empty repositories
717 # END advance head handling
718
719 return new_commit
720
721 # { Serializable Implementation
722
723 def _serialize(self, stream: BytesIO) -> "Commit":
724 write = stream.write
725 write(("tree %s\n" % self.tree).encode("ascii"))
726 for p in self.parents:
727 write(("parent %s\n" % p).encode("ascii"))
728
729 a = self.author
730 aname = a.name
731 c = self.committer
732 fmt = "%s %s <%s> %s %s\n"
733 write(
734 (
735 fmt
736 % (
737 "author",
738 aname,
739 a.email,
740 self.authored_date,
741 altz_to_utctz_str(self.author_tz_offset),
742 )
743 ).encode(self.encoding)
744 )
745
746 # Encode committer.
747 aname = c.name
748 write(
749 (
750 fmt
751 % (
752 "committer",
753 aname,
754 c.email,
755 self.committed_date,
756 altz_to_utctz_str(self.committer_tz_offset),
757 )
758 ).encode(self.encoding)
759 )
760
761 if self.encoding != self.default_encoding:
762 write(("encoding %s\n" % self.encoding).encode("ascii"))
763
764 try:
765 if self.__getattribute__("gpgsig"):
766 write(b"gpgsig")
767 for sigline in self.gpgsig.rstrip("\n").split("\n"):
768 write((" " + sigline + "\n").encode("ascii"))
769 except AttributeError:
770 pass
771
772 write(b"\n")
773
774 # Write plain bytes, be sure its encoded according to our encoding.
775 if isinstance(self.message, str):
776 write(self.message.encode(self.encoding))
777 else:
778 write(self.message)
779 # END handle encoding
780 return self
781
782 def _deserialize(self, stream: BytesIO) -> "Commit":
783 readline = stream.readline
784 self.tree = Tree(self.repo, hex_to_bin(readline().split()[1]), Tree.tree_id << 12, "")
785
786 self.parents = []
787 next_line = None
788 while True:
789 parent_line = readline()
790 if not parent_line.startswith(b"parent"):
791 next_line = parent_line
792 break
793 # END abort reading parents
794 self.parents.append(type(self)(self.repo, hex_to_bin(parent_line.split()[-1].decode("ascii"))))
795 # END for each parent line
796 self.parents = tuple(self.parents)
797
798 # We don't know actual author encoding before we have parsed it, so keep the
799 # lines around.
800 author_line = next_line
801 committer_line = readline()
802
803 # We might run into one or more mergetag blocks, skip those for now.
804 next_line = readline()
805 while next_line.startswith(b"mergetag "):
806 next_line = readline()
807 while next_line.startswith(b" "):
808 next_line = readline()
809 # END skip mergetags
810
811 # Now we can have the encoding line, or an empty line followed by the optional
812 # message.
813 self.encoding = self.default_encoding
814 self.gpgsig = ""
815
816 # Read headers.
817 enc = next_line
818 buf = enc.strip()
819 while buf:
820 if buf[0:10] == b"encoding ":
821 self.encoding = buf[buf.find(b" ") + 1 :].decode(self.encoding, "ignore")
822 elif buf[0:7] == b"gpgsig ":
823 sig = buf[buf.find(b" ") + 1 :] + b"\n"
824 is_next_header = False
825 while True:
826 sigbuf = readline()
827 if not sigbuf:
828 break
829 if sigbuf[0:1] != b" ":
830 buf = sigbuf.strip()
831 is_next_header = True
832 break
833 sig += sigbuf[1:]
834 # END read all signature
835 self.gpgsig = sig.rstrip(b"\n").decode(self.encoding, "ignore")
836 if is_next_header:
837 continue
838 buf = readline().strip()
839
840 # Decode the author's name.
841 try:
842 (
843 self.author,
844 self.authored_date,
845 self.author_tz_offset,
846 ) = parse_actor_and_date(author_line.decode(self.encoding, "replace"))
847 except UnicodeDecodeError:
848 _logger.error(
849 "Failed to decode author line '%s' using encoding %s",
850 author_line,
851 self.encoding,
852 exc_info=True,
853 )
854
855 try:
856 (
857 self.committer,
858 self.committed_date,
859 self.committer_tz_offset,
860 ) = parse_actor_and_date(committer_line.decode(self.encoding, "replace"))
861 except UnicodeDecodeError:
862 _logger.error(
863 "Failed to decode committer line '%s' using encoding %s",
864 committer_line,
865 self.encoding,
866 exc_info=True,
867 )
868 # END handle author's encoding
869
870 # A stream from our data simply gives us the plain message.
871 # The end of our message stream is marked with a newline that we strip.
872 self.message = stream.read()
873 try:
874 self.message = self.message.decode(self.encoding, "replace")
875 except UnicodeDecodeError:
876 _logger.error(
877 "Failed to decode message '%s' using encoding %s",
878 self.message,
879 self.encoding,
880 exc_info=True,
881 )
882 # END exception handling
883
884 return self
885
886 # } END serializable implementation
887
888 @property
889 def co_authors(self) -> List[Actor]:
890 """Search the commit message for any co-authors of this commit.
891
892 Details on co-authors:
893 https://github.blog/2018-01-29-commit-together-with-co-authors/
894
895 :return:
896 List of co-authors for this commit (as :class:`~git.util.Actor` objects).
897 """
898 co_authors = []
899
900 if self.message:
901 results = re.findall(
902 r"^Co-authored-by: (.*) <(.*?)>$",
903 self.message,
904 re.MULTILINE,
905 )
906 for author in results:
907 co_authors.append(Actor(*author))
908
909 return co_authors