Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/rekor/checkpoint.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

92 statements  

1# Copyright 2023 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Rekor Checkpoint machinery. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import re 

23import struct 

24import typing 

25from dataclasses import dataclass 

26 

27from pydantic import BaseModel, Field, StrictStr 

28 

29from sigstore._utils import KeyID 

30from sigstore.errors import VerificationError 

31 

32if typing.TYPE_CHECKING: 

33 from sigstore._internal.trust import RekorKeyring 

34 from sigstore.models import TransparencyLogEntry 

35 

36 

37@dataclass(frozen=True) 

38class RekorSignature: 

39 """ 

40 Represents a `RekorSignature` containing: 

41 

42 - the name of the signature, e.g. "rekor.sigstage.dev" 

43 - the signature hash 

44 - the base64 signature 

45 """ 

46 

47 name: str 

48 sig_hash: bytes 

49 signature: bytes 

50 

51 

52class LogCheckpoint(BaseModel): 

53 """ 

54 Represents a Rekor `LogCheckpoint` containing: 

55 

56 - an origin, e.g. "rekor.sigstage.dev - 8050909264565447525" 

57 - the size of the log, 

58 - the hash of the log, 

59 - and any optional ancillary constants, e.g. "Timestamp: 1679349379012118479" 

60 

61 See: <https://github.com/transparency-dev/formats/blob/main/log/README.md> 

62 """ 

63 

64 origin: StrictStr 

65 log_size: int 

66 log_hash: StrictStr 

67 other_content: list[str] 

68 

69 @classmethod 

70 def from_text(cls, text: str) -> LogCheckpoint: 

71 """ 

72 Serialize from the text header ("note") of a SignedNote. 

73 """ 

74 

75 lines = text.strip().split("\n") 

76 if len(lines) < 3: 

77 raise VerificationError("malformed LogCheckpoint: too few items in header") 

78 

79 origin = lines[0] 

80 if len(origin) == 0: 

81 raise VerificationError("malformed LogCheckpoint: empty origin") 

82 

83 log_size = int(lines[1]) 

84 root_hash = base64.b64decode(lines[2]).hex() 

85 

86 return LogCheckpoint( 

87 origin=origin, 

88 log_size=log_size, 

89 log_hash=root_hash, 

90 other_content=lines[3:], 

91 ) 

92 

93 @classmethod 

94 def to_text(self) -> str: 

95 """ 

96 Serialize a `LogCheckpoint` into text format. 

97 See class definition for a prose description of the format. 

98 """ 

99 return "\n".join( 

100 [self.origin, str(self.log_size), self.log_hash, *self.other_content] 

101 ) 

102 

103 

104@dataclass(frozen=True) 

105class SignedNote: 

106 """ 

107 Represents a "signed note" containing a note and its corresponding list of signatures. 

108 """ 

109 

110 note: StrictStr = Field(..., alias="note") 

111 signatures: list[RekorSignature] = Field(..., alias="signatures") 

112 

113 @classmethod 

114 def from_text(cls, text: str) -> SignedNote: 

115 """ 

116 Deserialize from a bundled text 'note'. 

117 

118 A note contains: 

119 - a name, a string associated with the signer, 

120 - a separator blank line, 

121 - and signature(s), each signature takes the form 

122 `\u2014 NAME SIGNATURE\n` 

123 (where \u2014 == em dash). 

124 

125 This is derived from Rekor's `UnmarshalText`: 

126 <https://github.com/sigstore/rekor/blob/4b1fa6661cc6dfbc844b4c6ed9b1f44e7c5ae1c0/pkg/util/signed_note.go#L141> 

127 """ 

128 

129 separator: str = "\n\n" 

130 if text.count(separator) != 1: 

131 raise VerificationError( 

132 "note must contain one blank line, delineating the text from the signature block" 

133 ) 

134 split = text.index(separator) 

135 

136 header: str = text[: split + 1] 

137 data: str = text[split + len(separator) :] 

138 

139 if len(data) == 0: 

140 raise VerificationError( 

141 "malformed Note: must contain at least one signature" 

142 ) 

143 if data[-1] != "\n": 

144 raise VerificationError( 

145 "malformed Note: data section must end with newline" 

146 ) 

147 

148 sig_parser = re.compile(r"\u2014 (\S+) (\S+)\n") 

149 signatures: list[RekorSignature] = [] 

150 for name, signature in re.findall(sig_parser, data): 

151 signature_bytes: bytes = base64.b64decode(signature) 

152 if len(signature_bytes) < 5: 

153 raise VerificationError( 

154 "malformed Note: signature contains too few bytes" 

155 ) 

156 

157 signature = RekorSignature( 

158 name=name, 

159 sig_hash=struct.unpack(">4s", signature_bytes[0:4])[0], 

160 signature=base64.b64encode(signature_bytes[4:]), 

161 ) 

162 signatures.append(signature) 

163 

164 return cls(note=header, signatures=signatures) 

165 

166 def verify(self, rekor_keyring: RekorKeyring, key_id: KeyID) -> None: 

167 """ 

168 Verify the `SignedNote` using the given RekorKeyring and KeyID. 

169 """ 

170 

171 note = str.encode(self.note) 

172 

173 for sig in self.signatures: 

174 if sig.sig_hash == key_id[:4]: 

175 try: 

176 rekor_keyring.verify( 

177 key_id=key_id, 

178 signature=base64.b64decode(sig.signature), 

179 data=note, 

180 ) 

181 return 

182 except VerificationError as sig_err: 

183 raise VerificationError(f"checkpoint: invalid signature: {sig_err}") 

184 

185 raise VerificationError( 

186 f"checkpoint: Signature not found for log ID {key_id.hex()}" 

187 ) 

188 

189 

190@dataclass(frozen=True) 

191class SignedCheckpoint: 

192 """ 

193 Represents a *signed* `Checkpoint`: a `LogCheckpoint` and its corresponding `SignedNote`. 

194 """ 

195 

196 signed_note: SignedNote 

197 checkpoint: LogCheckpoint 

198 

199 @classmethod 

200 def from_text(cls, text: str) -> SignedCheckpoint: 

201 """ 

202 Create a new `SignedCheckpoint` from the text representation. 

203 """ 

204 

205 signed_note = SignedNote.from_text(text) 

206 checkpoint = LogCheckpoint.from_text(signed_note.note) 

207 return cls(signed_note=signed_note, checkpoint=checkpoint) 

208 

209 

210def verify_checkpoint(rekor_keyring: RekorKeyring, entry: TransparencyLogEntry) -> None: 

211 """ 

212 Verify the inclusion proof's checkpoint. 

213 """ 

214 

215 inclusion_proof = entry._inner.inclusion_proof 

216 if inclusion_proof.checkpoint is None: 

217 raise VerificationError("Inclusion proof does not contain a checkpoint") 

218 

219 # verification occurs in two stages: 

220 # 1) verify the signature on the checkpoint 

221 # 2) verify the root hash in the checkpoint matches the root hash from the inclusion proof. 

222 signed_checkpoint = SignedCheckpoint.from_text(inclusion_proof.checkpoint.envelope) 

223 signed_checkpoint.signed_note.verify( 

224 rekor_keyring, 

225 KeyID(entry._inner.log_id.key_id), 

226 ) 

227 

228 checkpoint_hash = signed_checkpoint.checkpoint.log_hash 

229 root_hash = inclusion_proof.root_hash.hex() 

230 

231 if checkpoint_hash != root_hash: 

232 raise VerificationError( 

233 "Inclusion proof contains invalid root hash signature: ", 

234 f"expected {checkpoint_hash} got {root_hash}", 

235 )