Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/rekor/checkpoint.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1# Copyright 2023 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Rekor Checkpoint machinery. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import re 

23import struct 

24import typing 

25from dataclasses import dataclass 

26 

27from pydantic import BaseModel, Field, StrictStr 

28 

29from sigstore._utils import KeyID 

30from sigstore.errors import VerificationError 

31 

32if typing.TYPE_CHECKING: 

33 from sigstore._internal.trust import RekorKeyring 

34 from sigstore.models import LogEntry 

35 

36 

37@dataclass(frozen=True) 

38class RekorSignature: 

39 """ 

40 Represents a `RekorSignature` containing: 

41 

42 - the name of the signature, e.g. "rekor.sigstage.dev" 

43 - the signature hash 

44 - the base64 signature 

45 """ 

46 

47 name: str 

48 sig_hash: bytes 

49 signature: bytes 

50 

51 

52class LogCheckpoint(BaseModel): 

53 """ 

54 Represents a Rekor `LogCheckpoint` containing: 

55 

56 - an origin, e.g. "rekor.sigstage.dev - 8050909264565447525" 

57 - the size of the log, 

58 - the hash of the log, 

59 - and any optional ancillary constants, e.g. "Timestamp: 1679349379012118479" 

60 

61 See: <https://github.com/transparency-dev/formats/blob/main/log/README.md> 

62 """ 

63 

64 origin: StrictStr 

65 log_size: int 

66 log_hash: StrictStr 

67 other_content: list[str] 

68 

69 @classmethod 

70 def from_text(cls, text: str) -> LogCheckpoint: 

71 """ 

72 Serialize from the text header ("note") of a SignedNote. 

73 """ 

74 

75 lines = text.strip().split("\n") 

76 if len(lines) < 3: 

77 raise VerificationError("malformed LogCheckpoint: too few items in header") 

78 

79 origin = lines[0] 

80 if len(origin) == 0: 

81 raise VerificationError("malformed LogCheckpoint: empty origin") 

82 

83 log_size = int(lines[1]) 

84 root_hash = base64.b64decode(lines[2]).hex() 

85 

86 return LogCheckpoint( 

87 origin=origin, 

88 log_size=log_size, 

89 log_hash=root_hash, 

90 other_content=lines[3:], 

91 ) 

92 

93 @classmethod 

94 def to_text(self) -> str: 

95 """ 

96 Serialize a `LogCheckpoint` into text format. 

97 See class definition for a prose description of the format. 

98 """ 

99 return "\n".join( 

100 [self.origin, str(self.log_size), self.log_hash, *self.other_content] 

101 ) 

102 

103 

104@dataclass(frozen=True) 

105class SignedNote: 

106 """ 

107 Represents a "signed note" containing a note and its corresponding list of signatures. 

108 """ 

109 

110 note: StrictStr = Field(..., alias="note") 

111 signatures: list[RekorSignature] = Field(..., alias="signatures") 

112 

113 @classmethod 

114 def from_text(cls, text: str) -> SignedNote: 

115 """ 

116 Deserialize from a bundled text 'note'. 

117 

118 A note contains: 

119 - a name, a string associated with the signer, 

120 - a separator blank line, 

121 - and signature(s), each signature takes the form 

122 `\u2014 NAME SIGNATURE\n` 

123 (where \u2014 == em dash). 

124 

125 This is derived from Rekor's `UnmarshalText`: 

126 <https://github.com/sigstore/rekor/blob/4b1fa6661cc6dfbc844b4c6ed9b1f44e7c5ae1c0/pkg/util/signed_note.go#L141> 

127 """ 

128 

129 separator: str = "\n\n" 

130 if text.count(separator) != 1: 

131 raise VerificationError( 

132 "note must contain one blank line, delineating the text from the signature block" 

133 ) 

134 split = text.index(separator) 

135 

136 header: str = text[: split + 1] 

137 data: str = text[split + len(separator) :] 

138 

139 if len(data) == 0: 

140 raise VerificationError( 

141 "malformed Note: must contain at least one signature" 

142 ) 

143 if data[-1] != "\n": 

144 raise VerificationError( 

145 "malformed Note: data section must end with newline" 

146 ) 

147 

148 sig_parser = re.compile(r"\u2014 (\S+) (\S+)\n") 

149 signatures: list[RekorSignature] = [] 

150 for name, signature in re.findall(sig_parser, data): 

151 signature_bytes: bytes = base64.b64decode(signature) 

152 if len(signature_bytes) < 5: 

153 raise VerificationError( 

154 "malformed Note: signature contains too few bytes" 

155 ) 

156 

157 signature = RekorSignature( 

158 name=name, 

159 sig_hash=struct.unpack(">4s", signature_bytes[0:4])[0], 

160 signature=base64.b64encode(signature_bytes[4:]), 

161 ) 

162 signatures.append(signature) 

163 

164 return cls(note=header, signatures=signatures) 

165 

166 def verify(self, rekor_keyring: RekorKeyring, key_id: KeyID) -> None: 

167 """ 

168 Verify the `SignedNote` using the given RekorKeyring by verifying 

169 each contained signature. 

170 """ 

171 

172 note = str.encode(self.note) 

173 

174 for sig in self.signatures: 

175 if sig.sig_hash != key_id[:4]: 

176 raise VerificationError( 

177 "checkpoint: sig_hash hint does not match expected key_id" 

178 ) 

179 

180 try: 

181 rekor_keyring.verify( 

182 key_id=key_id, signature=base64.b64decode(sig.signature), data=note 

183 ) 

184 except VerificationError as sig_err: 

185 raise VerificationError(f"checkpoint: invalid signature: {sig_err}") 

186 

187 

188@dataclass(frozen=True) 

189class SignedCheckpoint: 

190 """ 

191 Represents a *signed* `Checkpoint`: a `LogCheckpoint` and its corresponding `SignedNote`. 

192 """ 

193 

194 signed_note: SignedNote 

195 checkpoint: LogCheckpoint 

196 

197 @classmethod 

198 def from_text(cls, text: str) -> SignedCheckpoint: 

199 """ 

200 Create a new `SignedCheckpoint` from the text representation. 

201 """ 

202 

203 signed_note = SignedNote.from_text(text) 

204 checkpoint = LogCheckpoint.from_text(signed_note.note) 

205 return cls(signed_note=signed_note, checkpoint=checkpoint) 

206 

207 

208def verify_checkpoint(rekor_keyring: RekorKeyring, entry: LogEntry) -> None: 

209 """ 

210 Verify the inclusion proof's checkpoint. 

211 """ 

212 

213 inclusion_proof = entry.inclusion_proof 

214 if inclusion_proof is None: 

215 raise VerificationError("Rekor entry has no inclusion proof") 

216 

217 # verification occurs in two stages: 

218 # 1) verify the signature on the checkpoint 

219 # 2) verify the root hash in the checkpoint matches the root hash from the inclusion proof. 

220 signed_checkpoint = SignedCheckpoint.from_text(inclusion_proof.checkpoint) 

221 signed_checkpoint.signed_note.verify( 

222 rekor_keyring, KeyID(bytes.fromhex(entry.log_id)) 

223 ) 

224 

225 checkpoint_hash = signed_checkpoint.checkpoint.log_hash 

226 root_hash = inclusion_proof.root_hash 

227 

228 if checkpoint_hash != root_hash: 

229 raise VerificationError( 

230 "Inclusion proof contains invalid root hash signature: ", 

231 f"expected {checkpoint_hash} got {root_hash}", 

232 )