Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/rekor/client.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

119 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Client implementation for interacting with Rekor (v1). 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import json 

23import logging 

24import threading 

25from abc import ABC 

26from dataclasses import dataclass 

27from typing import Any 

28 

29import rekor_types 

30import requests 

31from cryptography.hazmat.primitives import serialization 

32from cryptography.x509 import Certificate 

33 

34from sigstore._internal import USER_AGENT 

35from sigstore._internal.rekor import ( 

36 EntryRequestBody, 

37 RekorClientError, 

38 RekorLogSubmitter, 

39) 

40from sigstore.dsse import Envelope 

41from sigstore.hashes import Hashed 

42from sigstore.models import TransparencyLogEntry 

43 

44_logger = logging.getLogger(__name__) 

45 

46DEFAULT_REKOR_URL = "https://rekor.sigstore.dev" 

47STAGING_REKOR_URL = "https://rekor.sigstage.dev" 

48 

49 

50@dataclass(frozen=True) 

51class RekorLogInfo: 

52 """ 

53 Represents information about the Rekor log. 

54 """ 

55 

56 root_hash: str 

57 tree_size: int 

58 signed_tree_head: str 

59 tree_id: str 

60 raw_data: dict[str, Any] 

61 

62 @classmethod 

63 def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo: 

64 """ 

65 Create a new `RekorLogInfo` from the given API response. 

66 """ 

67 return cls( 

68 root_hash=dict_["rootHash"], 

69 tree_size=dict_["treeSize"], 

70 signed_tree_head=dict_["signedTreeHead"], 

71 tree_id=dict_["treeID"], 

72 raw_data=dict_, 

73 ) 

74 

75 

76class _Endpoint(ABC): 

77 def __init__(self, url: str, session: requests.Session) -> None: 

78 # Note that _Endpoint may not be thread safe if the same Session is provided 

79 # to an _Endpoint in multiple threads 

80 self.url = url 

81 self.session = session 

82 

83 

84class RekorLog(_Endpoint): 

85 """ 

86 Represents a Rekor instance's log endpoint. 

87 """ 

88 

89 def get(self) -> RekorLogInfo: 

90 """ 

91 Returns information about the Rekor instance's log. 

92 """ 

93 resp: requests.Response = self.session.get(self.url) 

94 try: 

95 resp.raise_for_status() 

96 except requests.HTTPError as http_error: 

97 raise RekorClientError(http_error) 

98 return RekorLogInfo.from_response(resp.json()) 

99 

100 @property 

101 def entries(self) -> RekorEntries: 

102 """ 

103 Returns a `RekorEntries` capable of accessing detailed information 

104 about individual log entries. 

105 """ 

106 return RekorEntries(f"{self.url}/entries", session=self.session) 

107 

108 

109class RekorEntries(_Endpoint): 

110 """ 

111 Represents the individual log entry endpoints on a Rekor instance. 

112 """ 

113 

114 def get( 

115 self, *, uuid: str | None = None, log_index: int | None = None 

116 ) -> TransparencyLogEntry: 

117 """ 

118 Retrieve a specific log entry, either by UUID or by log index. 

119 

120 Either `uuid` or `log_index` must be present, but not both. 

121 """ 

122 if not (bool(uuid) ^ bool(log_index)): 

123 raise ValueError("uuid or log_index required, but not both") 

124 

125 resp: requests.Response 

126 

127 if uuid is not None: 

128 resp = self.session.get(f"{self.url}/{uuid}") 

129 else: 

130 resp = self.session.get(self.url, params={"logIndex": log_index}) 

131 

132 try: 

133 resp.raise_for_status() 

134 except requests.HTTPError as http_error: 

135 raise RekorClientError(http_error) 

136 return TransparencyLogEntry._from_v1_response(resp.json()) 

137 

138 def post( 

139 self, 

140 payload: EntryRequestBody, 

141 ) -> TransparencyLogEntry: 

142 """ 

143 Submit a new entry for inclusion in the Rekor log. 

144 """ 

145 

146 _logger.debug(f"proposed: {json.dumps(payload)}") 

147 

148 resp: requests.Response = self.session.post(self.url, json=payload) 

149 try: 

150 resp.raise_for_status() 

151 except requests.HTTPError as http_error: 

152 raise RekorClientError(http_error) 

153 

154 integrated_entry = resp.json() 

155 _logger.debug(f"integrated: {integrated_entry}") 

156 return TransparencyLogEntry._from_v1_response(integrated_entry) 

157 

158 @property 

159 def retrieve(self) -> RekorEntriesRetrieve: 

160 """ 

161 Returns a `RekorEntriesRetrieve` capable of retrieving entries. 

162 """ 

163 return RekorEntriesRetrieve(f"{self.url}/retrieve/", session=self.session) 

164 

165 

166class RekorEntriesRetrieve(_Endpoint): 

167 """ 

168 Represents the entry retrieval endpoints on a Rekor instance. 

169 """ 

170 

171 def post( 

172 self, 

173 expected_entry: rekor_types.Hashedrekord | rekor_types.Dsse, 

174 ) -> TransparencyLogEntry | None: 

175 """ 

176 Retrieves an extant Rekor entry, identified by its artifact signature, 

177 artifact hash, and signing certificate. 

178 

179 Returns None if Rekor has no entry corresponding to the signing 

180 materials. 

181 """ 

182 data = {"entries": [expected_entry.model_dump(mode="json", by_alias=True)]} 

183 

184 resp: requests.Response = self.session.post(self.url, json=data) 

185 try: 

186 resp.raise_for_status() 

187 except requests.HTTPError as http_error: 

188 if http_error.response and http_error.response.status_code == 404: 

189 return None 

190 raise RekorClientError(http_error) 

191 

192 results = resp.json() 

193 

194 # The response is a list of `{uuid: LogEntry}` objects. 

195 # We select the oldest entry for our actual return value, 

196 # since a malicious actor could conceivably spam the log with 

197 # newer duplicate entries. 

198 oldest_entry: TransparencyLogEntry | None = None 

199 for result in results: 

200 entry = TransparencyLogEntry._from_v1_response(result) 

201 

202 # We expect every entry in Rekor v1 to have an integrated time. 

203 if entry._inner.integrated_time is None: 

204 raise ValueError( 

205 f"Rekor v1 gave us an entry without an integrated time: {entry._inner.log_index}" 

206 ) 

207 

208 if ( 

209 oldest_entry is None 

210 or entry._inner.integrated_time < oldest_entry._inner.integrated_time # type: ignore[operator] 

211 ): 

212 oldest_entry = entry 

213 

214 return oldest_entry 

215 

216 

217class RekorClient(RekorLogSubmitter): 

218 """The internal Rekor client""" 

219 

220 def __init__(self, url: str) -> None: 

221 """ 

222 Create a new `RekorClient` from the given URL. 

223 """ 

224 self.url = f"{url}/api/v1" 

225 self._thread_local = threading.local() 

226 

227 @classmethod 

228 def production(cls) -> RekorClient: 

229 """ 

230 Returns a `RekorClient` populated with the default Rekor production instance. 

231 """ 

232 return cls( 

233 DEFAULT_REKOR_URL, 

234 ) 

235 

236 @classmethod 

237 def staging(cls) -> RekorClient: 

238 """ 

239 Returns a `RekorClient` populated with the default Rekor staging instance. 

240 """ 

241 return cls(STAGING_REKOR_URL) 

242 

243 @property 

244 def _session(self) -> requests.Session: 

245 """ 

246 Lazy-initialized thread-local session object 

247 """ 

248 if not hasattr(self._thread_local, "session"): 

249 session = requests.Session() 

250 session.headers.update( 

251 { 

252 "Content-Type": "application/json", 

253 "Accept": "application/json", 

254 "User-Agent": USER_AGENT, 

255 } 

256 ) 

257 self._thread_local.session = session 

258 return self._thread_local.session # type: ignore[no-any-return] 

259 

260 @property 

261 def log(self) -> RekorLog: 

262 """ 

263 Returns a `RekorLog` adapter for making requests to a Rekor log. 

264 """ 

265 

266 return RekorLog(f"{self.url}/log", session=self._session) 

267 

268 def create_entry(self, request: EntryRequestBody) -> TransparencyLogEntry: 

269 """ 

270 Submit the request to Rekor. 

271 

272 create_entry() can be called from multiple threads. 

273 """ 

274 return self.log.entries.post(request) 

275 

276 def _build_hashed_rekord_request( # type: ignore[override] 

277 self, hashed_input: Hashed, signature: bytes, certificate: Certificate 

278 ) -> EntryRequestBody: 

279 """ 

280 Construct a hashed rekord payload to submit to Rekor. 

281 """ 

282 rekord = rekor_types.Hashedrekord( 

283 spec=rekor_types.hashedrekord.HashedrekordV001Schema( 

284 signature=rekor_types.hashedrekord.Signature( 

285 content=base64.b64encode(signature).decode(), 

286 public_key=rekor_types.hashedrekord.PublicKey( 

287 content=base64.b64encode( 

288 certificate.public_bytes( 

289 encoding=serialization.Encoding.PEM 

290 ) 

291 ).decode() 

292 ), 

293 ), 

294 data=rekor_types.hashedrekord.Data( 

295 hash=rekor_types.hashedrekord.Hash( 

296 algorithm=hashed_input._as_hashedrekord_algorithm(), 

297 value=hashed_input.digest.hex(), 

298 ) 

299 ), 

300 ), 

301 ) 

302 return EntryRequestBody(rekord.model_dump(mode="json", by_alias=True)) 

303 

304 def _build_dsse_request( # type: ignore[override] 

305 self, envelope: Envelope, certificate: Certificate 

306 ) -> EntryRequestBody: 

307 """ 

308 Construct a dsse request to submit to Rekor. 

309 """ 

310 dsse = rekor_types.Dsse( 

311 spec=rekor_types.dsse.DsseSchema( 

312 # NOTE: mypy can't see that this kwarg is correct due to two interacting 

313 # behaviors/bugs (one pydantic, one datamodel-codegen): 

314 # See: <https://github.com/pydantic/pydantic/discussions/7418#discussioncomment-9024927> 

315 # See: <https://github.com/koxudaxi/datamodel-code-generator/issues/1903> 

316 proposed_content=rekor_types.dsse.ProposedContent( # type: ignore[call-arg] 

317 envelope=envelope.to_json(), 

318 verifiers=[ 

319 base64.b64encode( 

320 certificate.public_bytes( 

321 encoding=serialization.Encoding.PEM 

322 ) 

323 ).decode() 

324 ], 

325 ), 

326 ), 

327 ) 

328 return EntryRequestBody(dsse.model_dump(mode="json", by_alias=True))