1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Client implementation for interacting with Rekor.
17"""
18
19from __future__ import annotations
20
21import json
22import logging
23from abc import ABC
24from dataclasses import dataclass
25from typing import Any, Optional
26from urllib.parse import urljoin
27
28import rekor_types
29import requests
30
31from sigstore._internal import USER_AGENT
32from sigstore.models import LogEntry
33
34_logger = logging.getLogger(__name__)
35
36DEFAULT_REKOR_URL = "https://rekor.sigstore.dev"
37STAGING_REKOR_URL = "https://rekor.sigstage.dev"
38
39
40@dataclass(frozen=True)
41class RekorLogInfo:
42 """
43 Represents information about the Rekor log.
44 """
45
46 root_hash: str
47 tree_size: int
48 signed_tree_head: str
49 tree_id: str
50 raw_data: dict[str, Any]
51
52 @classmethod
53 def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo:
54 """
55 Create a new `RekorLogInfo` from the given API response.
56 """
57 return cls(
58 root_hash=dict_["rootHash"],
59 tree_size=dict_["treeSize"],
60 signed_tree_head=dict_["signedTreeHead"],
61 tree_id=dict_["treeID"],
62 raw_data=dict_,
63 )
64
65
66class RekorClientError(Exception):
67 """
68 A generic error in the Rekor client.
69 """
70
71 def __init__(self, http_error: requests.HTTPError):
72 """
73 Create a new `RekorClientError` from the given `requests.HTTPError`.
74 """
75 if http_error.response is not None:
76 try:
77 error = rekor_types.Error.model_validate_json(http_error.response.text)
78 super().__init__(f"{error.code}: {error.message}")
79 except Exception:
80 super().__init__(
81 f"Rekor returned an unknown error with HTTP {http_error.response.status_code}"
82 )
83 else:
84 super().__init__(f"Unexpected Rekor error: {http_error}")
85
86
87class _Endpoint(ABC):
88 def __init__(self, url: str, session: requests.Session) -> None:
89 self.url = url
90 self.session = session
91
92
93class RekorLog(_Endpoint):
94 """
95 Represents a Rekor instance's log endpoint.
96 """
97
98 def get(self) -> RekorLogInfo:
99 """
100 Returns information about the Rekor instance's log.
101 """
102 resp: requests.Response = self.session.get(self.url)
103 try:
104 resp.raise_for_status()
105 except requests.HTTPError as http_error:
106 raise RekorClientError(http_error)
107 return RekorLogInfo.from_response(resp.json())
108
109 @property
110 def entries(self) -> RekorEntries:
111 """
112 Returns a `RekorEntries` capable of accessing detailed information
113 about individual log entries.
114 """
115 return RekorEntries(urljoin(self.url, "entries/"), session=self.session)
116
117
118class RekorEntries(_Endpoint):
119 """
120 Represents the individual log entry endpoints on a Rekor instance.
121 """
122
123 def get(
124 self, *, uuid: Optional[str] = None, log_index: Optional[int] = None
125 ) -> LogEntry:
126 """
127 Retrieve a specific log entry, either by UUID or by log index.
128
129 Either `uuid` or `log_index` must be present, but not both.
130 """
131 if not (bool(uuid) ^ bool(log_index)):
132 raise ValueError("uuid or log_index required, but not both")
133
134 resp: requests.Response
135
136 if uuid is not None:
137 resp = self.session.get(urljoin(self.url, uuid))
138 else:
139 resp = self.session.get(self.url, params={"logIndex": log_index})
140
141 try:
142 resp.raise_for_status()
143 except requests.HTTPError as http_error:
144 raise RekorClientError(http_error)
145 return LogEntry._from_response(resp.json())
146
147 def post(
148 self,
149 proposed_entry: rekor_types.Hashedrekord | rekor_types.Dsse,
150 ) -> LogEntry:
151 """
152 Submit a new entry for inclusion in the Rekor log.
153 """
154
155 payload = proposed_entry.model_dump(mode="json", by_alias=True)
156 _logger.debug(f"proposed: {json.dumps(payload)}")
157
158 resp: requests.Response = self.session.post(self.url, json=payload)
159 try:
160 resp.raise_for_status()
161 except requests.HTTPError as http_error:
162 raise RekorClientError(http_error)
163
164 integrated_entry = resp.json()
165 _logger.debug(f"integrated: {integrated_entry}")
166 return LogEntry._from_response(integrated_entry)
167
168 @property
169 def retrieve(self) -> RekorEntriesRetrieve:
170 """
171 Returns a `RekorEntriesRetrieve` capable of retrieving entries.
172 """
173 return RekorEntriesRetrieve(
174 urljoin(self.url, "retrieve/"), session=self.session
175 )
176
177
178class RekorEntriesRetrieve(_Endpoint):
179 """
180 Represents the entry retrieval endpoints on a Rekor instance.
181 """
182
183 def post(
184 self,
185 expected_entry: rekor_types.Hashedrekord | rekor_types.Dsse,
186 ) -> Optional[LogEntry]:
187 """
188 Retrieves an extant Rekor entry, identified by its artifact signature,
189 artifact hash, and signing certificate.
190
191 Returns None if Rekor has no entry corresponding to the signing
192 materials.
193 """
194 data = {"entries": [expected_entry.model_dump(mode="json", by_alias=True)]}
195
196 resp: requests.Response = self.session.post(self.url, json=data)
197 try:
198 resp.raise_for_status()
199 except requests.HTTPError as http_error:
200 if http_error.response and http_error.response.status_code == 404:
201 return None
202 raise RekorClientError(http_error)
203
204 results = resp.json()
205
206 # The response is a list of `{uuid: LogEntry}` objects.
207 # We select the oldest entry for our actual return value,
208 # since a malicious actor could conceivably spam the log with
209 # newer duplicate entries.
210 oldest_entry: Optional[LogEntry] = None
211 for result in results:
212 entry = LogEntry._from_response(result)
213 if (
214 oldest_entry is None
215 or entry.integrated_time < oldest_entry.integrated_time
216 ):
217 oldest_entry = entry
218
219 return oldest_entry
220
221
222class RekorClient:
223 """The internal Rekor client"""
224
225 def __init__(self, url: str) -> None:
226 """
227 Create a new `RekorClient` from the given URL.
228 """
229 self.url = urljoin(url, "api/v1/")
230 self.session = requests.Session()
231 self.session.headers.update(
232 {
233 "Content-Type": "application/json",
234 "Accept": "application/json",
235 "User-Agent": USER_AGENT,
236 }
237 )
238
239 def __del__(self) -> None:
240 """
241 Terminates the underlying network session.
242 """
243 self.session.close()
244
245 @classmethod
246 def production(cls) -> RekorClient:
247 """
248 Returns a `RekorClient` populated with the default Rekor production instance.
249 """
250 return cls(
251 DEFAULT_REKOR_URL,
252 )
253
254 @classmethod
255 def staging(cls) -> RekorClient:
256 """
257 Returns a `RekorClient` populated with the default Rekor staging instance.
258 """
259 return cls(STAGING_REKOR_URL)
260
261 @property
262 def log(self) -> RekorLog:
263 """
264 Returns a `RekorLog` adapter for making requests to a Rekor log.
265 """
266 return RekorLog(urljoin(self.url, "log/"), session=self.session)