1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Client implementation for interacting with Rekor (v1).
17"""
18
19from __future__ import annotations
20
21import base64
22import json
23import logging
24from abc import ABC
25from dataclasses import dataclass
26from typing import Any
27
28import rekor_types
29import requests
30from cryptography.hazmat.primitives import serialization
31from cryptography.x509 import Certificate
32
33from sigstore._internal import USER_AGENT
34from sigstore._internal.rekor import (
35 EntryRequestBody,
36 RekorClientError,
37 RekorLogSubmitter,
38)
39from sigstore.dsse import Envelope
40from sigstore.hashes import Hashed
41from sigstore.models import TransparencyLogEntry
42
43_logger = logging.getLogger(__name__)
44
45DEFAULT_REKOR_URL = "https://rekor.sigstore.dev"
46STAGING_REKOR_URL = "https://rekor.sigstage.dev"
47
48
49@dataclass(frozen=True)
50class RekorLogInfo:
51 """
52 Represents information about the Rekor log.
53 """
54
55 root_hash: str
56 tree_size: int
57 signed_tree_head: str
58 tree_id: str
59 raw_data: dict[str, Any]
60
61 @classmethod
62 def from_response(cls, dict_: dict[str, Any]) -> RekorLogInfo:
63 """
64 Create a new `RekorLogInfo` from the given API response.
65 """
66 return cls(
67 root_hash=dict_["rootHash"],
68 tree_size=dict_["treeSize"],
69 signed_tree_head=dict_["signedTreeHead"],
70 tree_id=dict_["treeID"],
71 raw_data=dict_,
72 )
73
74
75class _Endpoint(ABC):
76 def __init__(self, url: str, session: requests.Session | None = None) -> None:
77 # Note that _Endpoint may not be thread be safe if the same Session is provided
78 # to an _Endpoint in multiple threads
79 self.url = url
80 if session is None:
81 session = requests.Session()
82 session.headers.update(
83 {
84 "Content-Type": "application/json",
85 "Accept": "application/json",
86 "User-Agent": USER_AGENT,
87 }
88 )
89
90 self.session = session
91
92
93class RekorLog(_Endpoint):
94 """
95 Represents a Rekor instance's log endpoint.
96 """
97
98 def get(self) -> RekorLogInfo:
99 """
100 Returns information about the Rekor instance's log.
101 """
102 resp: requests.Response = self.session.get(self.url)
103 try:
104 resp.raise_for_status()
105 except requests.HTTPError as http_error:
106 raise RekorClientError(http_error)
107 return RekorLogInfo.from_response(resp.json())
108
109 @property
110 def entries(self) -> RekorEntries:
111 """
112 Returns a `RekorEntries` capable of accessing detailed information
113 about individual log entries.
114 """
115 return RekorEntries(f"{self.url}/entries", session=self.session)
116
117
118class RekorEntries(_Endpoint):
119 """
120 Represents the individual log entry endpoints on a Rekor instance.
121 """
122
123 def get(
124 self, *, uuid: str | None = None, log_index: int | None = None
125 ) -> TransparencyLogEntry:
126 """
127 Retrieve a specific log entry, either by UUID or by log index.
128
129 Either `uuid` or `log_index` must be present, but not both.
130 """
131 if not (bool(uuid) ^ bool(log_index)):
132 raise ValueError("uuid or log_index required, but not both")
133
134 resp: requests.Response
135
136 if uuid is not None:
137 resp = self.session.get(f"{self.url}/{uuid}")
138 else:
139 resp = self.session.get(self.url, params={"logIndex": log_index})
140
141 try:
142 resp.raise_for_status()
143 except requests.HTTPError as http_error:
144 raise RekorClientError(http_error)
145 return TransparencyLogEntry._from_v1_response(resp.json())
146
147 def post(
148 self,
149 payload: EntryRequestBody,
150 ) -> TransparencyLogEntry:
151 """
152 Submit a new entry for inclusion in the Rekor log.
153 """
154
155 _logger.debug(f"proposed: {json.dumps(payload)}")
156
157 resp: requests.Response = self.session.post(self.url, json=payload)
158 try:
159 resp.raise_for_status()
160 except requests.HTTPError as http_error:
161 raise RekorClientError(http_error)
162
163 integrated_entry = resp.json()
164 _logger.debug(f"integrated: {integrated_entry}")
165 return TransparencyLogEntry._from_v1_response(integrated_entry)
166
167 @property
168 def retrieve(self) -> RekorEntriesRetrieve:
169 """
170 Returns a `RekorEntriesRetrieve` capable of retrieving entries.
171 """
172 return RekorEntriesRetrieve(f"{self.url}/retrieve/", session=self.session)
173
174
175class RekorEntriesRetrieve(_Endpoint):
176 """
177 Represents the entry retrieval endpoints on a Rekor instance.
178 """
179
180 def post(
181 self,
182 expected_entry: rekor_types.Hashedrekord | rekor_types.Dsse,
183 ) -> TransparencyLogEntry | None:
184 """
185 Retrieves an extant Rekor entry, identified by its artifact signature,
186 artifact hash, and signing certificate.
187
188 Returns None if Rekor has no entry corresponding to the signing
189 materials.
190 """
191 data = {"entries": [expected_entry.model_dump(mode="json", by_alias=True)]}
192
193 resp: requests.Response = self.session.post(self.url, json=data)
194 try:
195 resp.raise_for_status()
196 except requests.HTTPError as http_error:
197 if http_error.response and http_error.response.status_code == 404:
198 return None
199 raise RekorClientError(http_error)
200
201 results = resp.json()
202
203 # The response is a list of `{uuid: LogEntry}` objects.
204 # We select the oldest entry for our actual return value,
205 # since a malicious actor could conceivably spam the log with
206 # newer duplicate entries.
207 oldest_entry: TransparencyLogEntry | None = None
208 for result in results:
209 entry = TransparencyLogEntry._from_v1_response(result)
210
211 # We expect every entry in Rekor v1 to have an integrated time.
212 if entry._inner.integrated_time is None:
213 raise ValueError(
214 f"Rekor v1 gave us an entry without an integrated time: {entry._inner.log_index}"
215 )
216
217 if (
218 oldest_entry is None
219 or entry._inner.integrated_time < oldest_entry._inner.integrated_time # type: ignore[operator]
220 ):
221 oldest_entry = entry
222
223 return oldest_entry
224
225
226class RekorClient(RekorLogSubmitter):
227 """The internal Rekor client"""
228
229 def __init__(self, url: str) -> None:
230 """
231 Create a new `RekorClient` from the given URL.
232 """
233 self.url = f"{url}/api/v1"
234
235 @classmethod
236 def production(cls) -> RekorClient:
237 """
238 Returns a `RekorClient` populated with the default Rekor production instance.
239 """
240 return cls(
241 DEFAULT_REKOR_URL,
242 )
243
244 @classmethod
245 def staging(cls) -> RekorClient:
246 """
247 Returns a `RekorClient` populated with the default Rekor staging instance.
248 """
249 return cls(STAGING_REKOR_URL)
250
251 @property
252 def log(self) -> RekorLog:
253 """
254 Returns a `RekorLog` adapter for making requests to a Rekor log.
255 """
256
257 return RekorLog(f"{self.url}/log")
258
259 def create_entry(self, request: EntryRequestBody) -> TransparencyLogEntry:
260 """
261 Submit the request to Rekor.
262 """
263 return self.log.entries.post(request)
264
265 def _build_hashed_rekord_request( # type: ignore[override]
266 self, hashed_input: Hashed, signature: bytes, certificate: Certificate
267 ) -> EntryRequestBody:
268 """
269 Construct a hashed rekord payload to submit to Rekor.
270 """
271 rekord = rekor_types.Hashedrekord(
272 spec=rekor_types.hashedrekord.HashedrekordV001Schema(
273 signature=rekor_types.hashedrekord.Signature(
274 content=base64.b64encode(signature).decode(),
275 public_key=rekor_types.hashedrekord.PublicKey(
276 content=base64.b64encode(
277 certificate.public_bytes(
278 encoding=serialization.Encoding.PEM
279 )
280 ).decode()
281 ),
282 ),
283 data=rekor_types.hashedrekord.Data(
284 hash=rekor_types.hashedrekord.Hash(
285 algorithm=hashed_input._as_hashedrekord_algorithm(),
286 value=hashed_input.digest.hex(),
287 )
288 ),
289 ),
290 )
291 return EntryRequestBody(rekord.model_dump(mode="json", by_alias=True))
292
293 def _build_dsse_request( # type: ignore[override]
294 self, envelope: Envelope, certificate: Certificate
295 ) -> EntryRequestBody:
296 """
297 Construct a dsse request to submit to Rekor.
298 """
299 dsse = rekor_types.Dsse(
300 spec=rekor_types.dsse.DsseSchema(
301 # NOTE: mypy can't see that this kwarg is correct due to two interacting
302 # behaviors/bugs (one pydantic, one datamodel-codegen):
303 # See: <https://github.com/pydantic/pydantic/discussions/7418#discussioncomment-9024927>
304 # See: <https://github.com/koxudaxi/datamodel-code-generator/issues/1903>
305 proposed_content=rekor_types.dsse.ProposedContent( # type: ignore[call-arg]
306 envelope=envelope.to_json(),
307 verifiers=[
308 base64.b64encode(
309 certificate.public_bytes(
310 encoding=serialization.Encoding.PEM
311 )
312 ).decode()
313 ],
314 ),
315 ),
316 )
317 return EntryRequestBody(dsse.model_dump(mode="json", by_alias=True))