Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/rekor/client.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

113 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Client implementation for interacting with Rekor (v1). 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import json 

23import logging 

24from abc import ABC 

25from dataclasses import dataclass 

26from typing import Any 

27 

28import rekor_types 

29import requests 

30from cryptography.hazmat.primitives import serialization 

31from cryptography.x509 import Certificate 

32 

33from sigstore._internal import USER_AGENT 

34from sigstore._internal.rekor import ( 

35 EntryRequestBody, 

36 RekorClientError, 

37 RekorLogSubmitter, 

38) 

39from sigstore.dsse import Envelope 

40from sigstore.hashes import Hashed 

41from sigstore.models import TransparencyLogEntry 

42 

43_logger = logging.getLogger(__name__) 

44 

45DEFAULT_REKOR_URL = "https://rekor.sigstore.dev" 

46STAGING_REKOR_URL = "https://rekor.sigstage.dev" 

47 

48 

49@dataclass(frozen=True) 

50class RekorLogInfo: 

51 """ 

52 Represents information about the Rekor log. 

53 """ 

54 

55 root_hash: str 

56 tree_size: int 

57 signed_tree_head: str 

58 tree_id: str 

59 raw_data: dict[str, Any] 

60 

61 @classmethod 

62 def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo: 

63 """ 

64 Create a new `RekorLogInfo` from the given API response. 

65 """ 

66 return cls( 

67 root_hash=dict_["rootHash"], 

68 tree_size=dict_["treeSize"], 

69 signed_tree_head=dict_["signedTreeHead"], 

70 tree_id=dict_["treeID"], 

71 raw_data=dict_, 

72 ) 

73 

74 

75class _Endpoint(ABC): 

76 def __init__(self, url: str, session: requests.Session | None = None) -> None: 

77 # Note that _Endpoint may not be thread be safe if the same Session is provided 

78 # to an _Endpoint in multiple threads 

79 self.url = url 

80 if session is None: 

81 session = requests.Session() 

82 session.headers.update( 

83 { 

84 "Content-Type": "application/json", 

85 "Accept": "application/json", 

86 "User-Agent": USER_AGENT, 

87 } 

88 ) 

89 

90 self.session = session 

91 

92 

93class RekorLog(_Endpoint): 

94 """ 

95 Represents a Rekor instance's log endpoint. 

96 """ 

97 

98 def get(self) -> RekorLogInfo: 

99 """ 

100 Returns information about the Rekor instance's log. 

101 """ 

102 resp: requests.Response = self.session.get(self.url) 

103 try: 

104 resp.raise_for_status() 

105 except requests.HTTPError as http_error: 

106 raise RekorClientError(http_error) 

107 return RekorLogInfo.from_response(resp.json()) 

108 

109 @property 

110 def entries(self) -> RekorEntries: 

111 """ 

112 Returns a `RekorEntries` capable of accessing detailed information 

113 about individual log entries. 

114 """ 

115 return RekorEntries(f"{self.url}/entries", session=self.session) 

116 

117 

118class RekorEntries(_Endpoint): 

119 """ 

120 Represents the individual log entry endpoints on a Rekor instance. 

121 """ 

122 

123 def get( 

124 self, *, uuid: str | None = None, log_index: int | None = None 

125 ) -> TransparencyLogEntry: 

126 """ 

127 Retrieve a specific log entry, either by UUID or by log index. 

128 

129 Either `uuid` or `log_index` must be present, but not both. 

130 """ 

131 if not (bool(uuid) ^ bool(log_index)): 

132 raise ValueError("uuid or log_index required, but not both") 

133 

134 resp: requests.Response 

135 

136 if uuid is not None: 

137 resp = self.session.get(f"{self.url}/{uuid}") 

138 else: 

139 resp = self.session.get(self.url, params={"logIndex": log_index}) 

140 

141 try: 

142 resp.raise_for_status() 

143 except requests.HTTPError as http_error: 

144 raise RekorClientError(http_error) 

145 return TransparencyLogEntry._from_v1_response(resp.json()) 

146 

147 def post( 

148 self, 

149 payload: EntryRequestBody, 

150 ) -> TransparencyLogEntry: 

151 """ 

152 Submit a new entry for inclusion in the Rekor log. 

153 """ 

154 

155 _logger.debug(f"proposed: {json.dumps(payload)}") 

156 

157 resp: requests.Response = self.session.post(self.url, json=payload) 

158 try: 

159 resp.raise_for_status() 

160 except requests.HTTPError as http_error: 

161 raise RekorClientError(http_error) 

162 

163 integrated_entry = resp.json() 

164 _logger.debug(f"integrated: {integrated_entry}") 

165 return TransparencyLogEntry._from_v1_response(integrated_entry) 

166 

167 @property 

168 def retrieve(self) -> RekorEntriesRetrieve: 

169 """ 

170 Returns a `RekorEntriesRetrieve` capable of retrieving entries. 

171 """ 

172 return RekorEntriesRetrieve(f"{self.url}/retrieve/", session=self.session) 

173 

174 

175class RekorEntriesRetrieve(_Endpoint): 

176 """ 

177 Represents the entry retrieval endpoints on a Rekor instance. 

178 """ 

179 

180 def post( 

181 self, 

182 expected_entry: rekor_types.Hashedrekord | rekor_types.Dsse, 

183 ) -> TransparencyLogEntry | None: 

184 """ 

185 Retrieves an extant Rekor entry, identified by its artifact signature, 

186 artifact hash, and signing certificate. 

187 

188 Returns None if Rekor has no entry corresponding to the signing 

189 materials. 

190 """ 

191 data = {"entries": [expected_entry.model_dump(mode="json", by_alias=True)]} 

192 

193 resp: requests.Response = self.session.post(self.url, json=data) 

194 try: 

195 resp.raise_for_status() 

196 except requests.HTTPError as http_error: 

197 if http_error.response and http_error.response.status_code == 404: 

198 return None 

199 raise RekorClientError(http_error) 

200 

201 results = resp.json() 

202 

203 # The response is a list of `{uuid: LogEntry}` objects. 

204 # We select the oldest entry for our actual return value, 

205 # since a malicious actor could conceivably spam the log with 

206 # newer duplicate entries. 

207 oldest_entry: TransparencyLogEntry | None = None 

208 for result in results: 

209 entry = TransparencyLogEntry._from_v1_response(result) 

210 

211 # We expect every entry in Rekor v1 to have an integrated time. 

212 if entry._inner.integrated_time is None: 

213 raise ValueError( 

214 f"Rekor v1 gave us an entry without an integrated time: {entry._inner.log_index}" 

215 ) 

216 

217 if ( 

218 oldest_entry is None 

219 or entry._inner.integrated_time < oldest_entry._inner.integrated_time # type: ignore[operator] 

220 ): 

221 oldest_entry = entry 

222 

223 return oldest_entry 

224 

225 

226class RekorClient(RekorLogSubmitter): 

227 """The internal Rekor client""" 

228 

229 def __init__(self, url: str) -> None: 

230 """ 

231 Create a new `RekorClient` from the given URL. 

232 """ 

233 self.url = f"{url}/api/v1" 

234 

235 @classmethod 

236 def production(cls) -> RekorClient: 

237 """ 

238 Returns a `RekorClient` populated with the default Rekor production instance. 

239 """ 

240 return cls( 

241 DEFAULT_REKOR_URL, 

242 ) 

243 

244 @classmethod 

245 def staging(cls) -> RekorClient: 

246 """ 

247 Returns a `RekorClient` populated with the default Rekor staging instance. 

248 """ 

249 return cls(STAGING_REKOR_URL) 

250 

251 @property 

252 def log(self) -> RekorLog: 

253 """ 

254 Returns a `RekorLog` adapter for making requests to a Rekor log. 

255 """ 

256 

257 return RekorLog(f"{self.url}/log") 

258 

259 def create_entry(self, request: EntryRequestBody) -> TransparencyLogEntry: 

260 """ 

261 Submit the request to Rekor. 

262 """ 

263 return self.log.entries.post(request) 

264 

265 def _build_hashed_rekord_request( # type: ignore[override] 

266 self, hashed_input: Hashed, signature: bytes, certificate: Certificate 

267 ) -> EntryRequestBody: 

268 """ 

269 Construct a hashed rekord payload to submit to Rekor. 

270 """ 

271 rekord = rekor_types.Hashedrekord( 

272 spec=rekor_types.hashedrekord.HashedrekordV001Schema( 

273 signature=rekor_types.hashedrekord.Signature( 

274 content=base64.b64encode(signature).decode(), 

275 public_key=rekor_types.hashedrekord.PublicKey( 

276 content=base64.b64encode( 

277 certificate.public_bytes( 

278 encoding=serialization.Encoding.PEM 

279 ) 

280 ).decode() 

281 ), 

282 ), 

283 data=rekor_types.hashedrekord.Data( 

284 hash=rekor_types.hashedrekord.Hash( 

285 algorithm=hashed_input._as_hashedrekord_algorithm(), 

286 value=hashed_input.digest.hex(), 

287 ) 

288 ), 

289 ), 

290 ) 

291 return EntryRequestBody(rekord.model_dump(mode="json", by_alias=True)) 

292 

293 def _build_dsse_request( # type: ignore[override] 

294 self, envelope: Envelope, certificate: Certificate 

295 ) -> EntryRequestBody: 

296 """ 

297 Construct a dsse request to submit to Rekor. 

298 """ 

299 dsse = rekor_types.Dsse( 

300 spec=rekor_types.dsse.DsseSchema( 

301 # NOTE: mypy can't see that this kwarg is correct due to two interacting 

302 # behaviors/bugs (one pydantic, one datamodel-codegen): 

303 # See: <https://github.com/pydantic/pydantic/discussions/7418#discussioncomment-9024927> 

304 # See: <https://github.com/koxudaxi/datamodel-code-generator/issues/1903> 

305 proposed_content=rekor_types.dsse.ProposedContent( # type: ignore[call-arg] 

306 envelope=envelope.to_json(), 

307 verifiers=[ 

308 base64.b64encode( 

309 certificate.public_bytes( 

310 encoding=serialization.Encoding.PEM 

311 ) 

312 ).decode() 

313 ], 

314 ), 

315 ), 

316 ) 

317 return EntryRequestBody(dsse.model_dump(mode="json", by_alias=True))