Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/rekor/client.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

109 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Client implementation for interacting with Rekor. 

17""" 

18 

19from __future__ import annotations 

20 

21import json 

22import logging 

23from abc import ABC 

24from dataclasses import dataclass 

25from typing import Any, Optional 

26from urllib.parse import urljoin 

27 

28import rekor_types 

29import requests 

30 

31from sigstore._internal import USER_AGENT 

32from sigstore.models import LogEntry 

33 

34_logger = logging.getLogger(__name__) 

35 

36DEFAULT_REKOR_URL = "https://rekor.sigstore.dev" 

37STAGING_REKOR_URL = "https://rekor.sigstage.dev" 

38 

39 

40@dataclass(frozen=True) 

41class RekorLogInfo: 

42 """ 

43 Represents information about the Rekor log. 

44 """ 

45 

46 root_hash: str 

47 tree_size: int 

48 signed_tree_head: str 

49 tree_id: str 

50 raw_data: dict[str, Any] 

51 

52 @classmethod 

53 def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo: 

54 """ 

55 Create a new `RekorLogInfo` from the given API response. 

56 """ 

57 return cls( 

58 root_hash=dict_["rootHash"], 

59 tree_size=dict_["treeSize"], 

60 signed_tree_head=dict_["signedTreeHead"], 

61 tree_id=dict_["treeID"], 

62 raw_data=dict_, 

63 ) 

64 

65 

66class RekorClientError(Exception): 

67 """ 

68 A generic error in the Rekor client. 

69 """ 

70 

71 def __init__(self, http_error: requests.HTTPError): 

72 """ 

73 Create a new `RekorClientError` from the given `requests.HTTPError`. 

74 """ 

75 if http_error.response is not None: 

76 try: 

77 error = rekor_types.Error.model_validate_json(http_error.response.text) 

78 super().__init__(f"{error.code}: {error.message}") 

79 except Exception: 

80 super().__init__( 

81 f"Rekor returned an unknown error with HTTP {http_error.response.status_code}" 

82 ) 

83 else: 

84 super().__init__(f"Unexpected Rekor error: {http_error}") 

85 

86 

87class _Endpoint(ABC): 

88 def __init__(self, url: str, session: requests.Session) -> None: 

89 self.url = url 

90 self.session = session 

91 

92 

93class RekorLog(_Endpoint): 

94 """ 

95 Represents a Rekor instance's log endpoint. 

96 """ 

97 

98 def get(self) -> RekorLogInfo: 

99 """ 

100 Returns information about the Rekor instance's log. 

101 """ 

102 resp: requests.Response = self.session.get(self.url) 

103 try: 

104 resp.raise_for_status() 

105 except requests.HTTPError as http_error: 

106 raise RekorClientError(http_error) 

107 return RekorLogInfo.from_response(resp.json()) 

108 

109 @property 

110 def entries(self) -> RekorEntries: 

111 """ 

112 Returns a `RekorEntries` capable of accessing detailed information 

113 about individual log entries. 

114 """ 

115 return RekorEntries(urljoin(self.url, "entries/"), session=self.session) 

116 

117 

118class RekorEntries(_Endpoint): 

119 """ 

120 Represents the individual log entry endpoints on a Rekor instance. 

121 """ 

122 

123 def get( 

124 self, *, uuid: Optional[str] = None, log_index: Optional[int] = None 

125 ) -> LogEntry: 

126 """ 

127 Retrieve a specific log entry, either by UUID or by log index. 

128 

129 Either `uuid` or `log_index` must be present, but not both. 

130 """ 

131 if not (bool(uuid) ^ bool(log_index)): 

132 raise ValueError("uuid or log_index required, but not both") 

133 

134 resp: requests.Response 

135 

136 if uuid is not None: 

137 resp = self.session.get(urljoin(self.url, uuid)) 

138 else: 

139 resp = self.session.get(self.url, params={"logIndex": log_index}) 

140 

141 try: 

142 resp.raise_for_status() 

143 except requests.HTTPError as http_error: 

144 raise RekorClientError(http_error) 

145 return LogEntry._from_response(resp.json()) 

146 

147 def post( 

148 self, 

149 proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse, 

150 ) -> LogEntry: 

151 """ 

152 Submit a new entry for inclusion in the Rekor log. 

153 """ 

154 

155 payload = proposed_entry.model_dump(mode="json", by_alias=True) 

156 _logger.debug(f"proposed: {json.dumps(payload)}") 

157 

158 resp: requests.Response = self.session.post(self.url, json=payload) 

159 try: 

160 resp.raise_for_status() 

161 except requests.HTTPError as http_error: 

162 raise RekorClientError(http_error) 

163 

164 integrated_entry = resp.json() 

165 _logger.debug(f"integrated: {integrated_entry}") 

166 return LogEntry._from_response(integrated_entry) 

167 

168 @property 

169 def retrieve(self) -> RekorEntriesRetrieve: 

170 """ 

171 Returns a `RekorEntriesRetrieve` capable of retrieving entries. 

172 """ 

173 return RekorEntriesRetrieve( 

174 urljoin(self.url, "retrieve/"), session=self.session 

175 ) 

176 

177 

178class RekorEntriesRetrieve(_Endpoint): 

179 """ 

180 Represents the entry retrieval endpoints on a Rekor instance. 

181 """ 

182 

183 def post( 

184 self, 

185 expected_entry: rekor_types.Hashedrekord | rekor_types.Dsse, 

186 ) -> Optional[LogEntry]: 

187 """ 

188 Retrieves an extant Rekor entry, identified by its artifact signature, 

189 artifact hash, and signing certificate. 

190 

191 Returns None if Rekor has no entry corresponding to the signing 

192 materials. 

193 """ 

194 data = {"entries": [expected_entry.model_dump(mode="json", by_alias=True)]} 

195 

196 resp: requests.Response = self.session.post(self.url, json=data) 

197 try: 

198 resp.raise_for_status() 

199 except requests.HTTPError as http_error: 

200 if http_error.response and http_error.response.status_code == 404: 

201 return None 

202 raise RekorClientError(http_error) 

203 

204 results = resp.json() 

205 

206 # The response is a list of `{uuid: LogEntry}` objects. 

207 # We select the oldest entry for our actual return value, 

208 # since a malicious actor could conceivably spam the log with 

209 # newer duplicate entries. 

210 oldest_entry: Optional[LogEntry] = None 

211 for result in results: 

212 entry = LogEntry._from_response(result) 

213 if ( 

214 oldest_entry is None 

215 or entry.integrated_time < oldest_entry.integrated_time 

216 ): 

217 oldest_entry = entry 

218 

219 return oldest_entry 

220 

221 

222class RekorClient: 

223 """The internal Rekor client""" 

224 

225 def __init__(self, url: str) -> None: 

226 """ 

227 Create a new `RekorClient` from the given URL. 

228 """ 

229 self.url = urljoin(url, "api/v1/") 

230 self.session = requests.Session() 

231 self.session.headers.update( 

232 { 

233 "Content-Type": "application/json", 

234 "Accept": "application/json", 

235 "User-Agent": USER_AGENT, 

236 } 

237 ) 

238 

239 def __del__(self) -> None: 

240 """ 

241 Terminates the underlying network session. 

242 """ 

243 self.session.close() 

244 

245 @classmethod 

246 def production(cls) -> RekorClient: 

247 """ 

248 Returns a `RekorClient` populated with the default Rekor production instance. 

249 """ 

250 return cls( 

251 DEFAULT_REKOR_URL, 

252 ) 

253 

254 @classmethod 

255 def staging(cls) -> RekorClient: 

256 """ 

257 Returns a `RekorClient` populated with the default Rekor staging instance. 

258 """ 

259 return cls(STAGING_REKOR_URL) 

260 

261 @property 

262 def log(self) -> RekorLog: 

263 """ 

264 Returns a `RekorLog` adapter for making requests to a Rekor log. 

265 """ 

266 return RekorLog(urljoin(self.url, "log/"), session=self.session)