1# Copyright 2023 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Rekor Checkpoint machinery.
17"""
18
19from __future__ import annotations
20
21import base64
22import re
23import struct
24import typing
25from dataclasses import dataclass
26
27from pydantic import BaseModel, Field, StrictStr
28
29from sigstore._utils import KeyID
30from sigstore.errors import VerificationError
31
32if typing.TYPE_CHECKING:
33 from sigstore._internal.trust import RekorKeyring
34 from sigstore.models import LogEntry
35
36
37@dataclass(frozen=True)
38class RekorSignature:
39 """
40 Represents a `RekorSignature` containing:
41
42 - the name of the signature, e.g. "rekor.sigstage.dev"
43 - the signature hash
44 - the base64 signature
45 """
46
47 name: str
48 sig_hash: bytes
49 signature: bytes
50
51
52class LogCheckpoint(BaseModel):
53 """
54 Represents a Rekor `LogCheckpoint` containing:
55
56 - an origin, e.g. "rekor.sigstage.dev - 8050909264565447525"
57 - the size of the log,
58 - the hash of the log,
59 - and any optional ancillary constants, e.g. "Timestamp: 1679349379012118479"
60
61 See: <https://github.com/transparency-dev/formats/blob/main/log/README.md>
62 """
63
64 origin: StrictStr
65 log_size: int
66 log_hash: StrictStr
67 other_content: list[str]
68
69 @classmethod
70 def from_text(cls, text: str) -> LogCheckpoint:
71 """
72 Serialize from the text header ("note") of a SignedNote.
73 """
74
75 lines = text.strip().split("\n")
76 if len(lines) < 3:
77 raise VerificationError("malformed LogCheckpoint: too few items in header")
78
79 origin = lines[0]
80 if len(origin) == 0:
81 raise VerificationError("malformed LogCheckpoint: empty origin")
82
83 log_size = int(lines[1])
84 root_hash = base64.b64decode(lines[2]).hex()
85
86 return LogCheckpoint(
87 origin=origin,
88 log_size=log_size,
89 log_hash=root_hash,
90 other_content=lines[3:],
91 )
92
93 @classmethod
94 def to_text(self) -> str:
95 """
96 Serialize a `LogCheckpoint` into text format.
97 See class definition for a prose description of the format.
98 """
99 return "\n".join(
100 [self.origin, str(self.log_size), self.log_hash, *self.other_content]
101 )
102
103
104@dataclass(frozen=True)
105class SignedNote:
106 """
107 Represents a "signed note" containing a note and its corresponding list of signatures.
108 """
109
110 note: StrictStr = Field(..., alias="note")
111 signatures: list[RekorSignature] = Field(..., alias="signatures")
112
113 @classmethod
114 def from_text(cls, text: str) -> SignedNote:
115 """
116 Deserialize from a bundled text 'note'.
117
118 A note contains:
119 - a name, a string associated with the signer,
120 - a separator blank line,
121 - and signature(s), each signature takes the form
122 `\u2014 NAME SIGNATURE\n`
123 (where \u2014 == em dash).
124
125 This is derived from Rekor's `UnmarshalText`:
126 <https://github.com/sigstore/rekor/blob/4b1fa6661cc6dfbc844b4c6ed9b1f44e7c5ae1c0/pkg/util/signed_note.go#L141>
127 """
128
129 separator: str = "\n\n"
130 if text.count(separator) != 1:
131 raise VerificationError(
132 "note must contain one blank line, delineating the text from the signature block"
133 )
134 split = text.index(separator)
135
136 header: str = text[: split + 1]
137 data: str = text[split + len(separator) :]
138
139 if len(data) == 0:
140 raise VerificationError(
141 "malformed Note: must contain at least one signature"
142 )
143 if data[-1] != "\n":
144 raise VerificationError(
145 "malformed Note: data section must end with newline"
146 )
147
148 sig_parser = re.compile(r"\u2014 (\S+) (\S+)\n")
149 signatures: list[RekorSignature] = []
150 for name, signature in re.findall(sig_parser, data):
151 signature_bytes: bytes = base64.b64decode(signature)
152 if len(signature_bytes) < 5:
153 raise VerificationError(
154 "malformed Note: signature contains too few bytes"
155 )
156
157 signature = RekorSignature(
158 name=name,
159 sig_hash=struct.unpack(">4s", signature_bytes[0:4])[0],
160 signature=base64.b64encode(signature_bytes[4:]),
161 )
162 signatures.append(signature)
163
164 return cls(note=header, signatures=signatures)
165
166 def verify(self, rekor_keyring: RekorKeyring, key_id: KeyID) -> None:
167 """
168 Verify the `SignedNote` using the given RekorKeyring by verifying
169 each contained signature.
170 """
171
172 note = str.encode(self.note)
173
174 for sig in self.signatures:
175 if sig.sig_hash != key_id[:4]:
176 raise VerificationError(
177 "checkpoint: sig_hash hint does not match expected key_id"
178 )
179
180 try:
181 rekor_keyring.verify(
182 key_id=key_id, signature=base64.b64decode(sig.signature), data=note
183 )
184 except VerificationError as sig_err:
185 raise VerificationError(f"checkpoint: invalid signature: {sig_err}")
186
187
188@dataclass(frozen=True)
189class SignedCheckpoint:
190 """
191 Represents a *signed* `Checkpoint`: a `LogCheckpoint` and its corresponding `SignedNote`.
192 """
193
194 signed_note: SignedNote
195 checkpoint: LogCheckpoint
196
197 @classmethod
198 def from_text(cls, text: str) -> SignedCheckpoint:
199 """
200 Create a new `SignedCheckpoint` from the text representation.
201 """
202
203 signed_note = SignedNote.from_text(text)
204 checkpoint = LogCheckpoint.from_text(signed_note.note)
205 return cls(signed_note=signed_note, checkpoint=checkpoint)
206
207
208def verify_checkpoint(rekor_keyring: RekorKeyring, entry: LogEntry) -> None:
209 """
210 Verify the inclusion proof's checkpoint.
211 """
212
213 inclusion_proof = entry.inclusion_proof
214 if inclusion_proof is None:
215 raise VerificationError("Rekor entry has no inclusion proof")
216
217 # verification occurs in two stages:
218 # 1) verify the signature on the checkpoint
219 # 2) verify the root hash in the checkpoint matches the root hash from the inclusion proof.
220 signed_checkpoint = SignedCheckpoint.from_text(inclusion_proof.checkpoint)
221 signed_checkpoint.signed_note.verify(
222 rekor_keyring, KeyID(bytes.fromhex(entry.log_id))
223 )
224
225 checkpoint_hash = signed_checkpoint.checkpoint.log_hash
226 root_hash = inclusion_proof.root_hash
227
228 if checkpoint_hash != root_hash:
229 raise VerificationError(
230 "Inclusion proof contains invalid root hash signature: ",
231 f"expected {checkpoint_hash} got {root_hash}",
232 )