1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16APIs for interacting with Rekor.
17"""
18
19from __future__ import annotations
20
21import base64
22import typing
23from abc import ABC, abstractmethod
24
25import rekor_types
26import requests
27from cryptography.x509 import Certificate
28
29from sigstore._utils import base64_encode_pem_cert
30from sigstore.dsse import Envelope
31from sigstore.hashes import Hashed
32
33if typing.TYPE_CHECKING:
34 from sigstore.models import LogEntry
35
36__all__ = [
37 "_hashedrekord_from_parts",
38]
39
40EntryRequestBody = typing.NewType("EntryRequestBody", dict[str, typing.Any])
41
42
43class RekorClientError(Exception):
44 """
45 A generic error in the Rekor client.
46 """
47
48 def __init__(self, http_error: requests.HTTPError):
49 """
50 Create a new `RekorClientError` from the given `requests.HTTPError`.
51 """
52 if http_error.response is not None:
53 try:
54 error = rekor_types.Error.model_validate_json(http_error.response.text)
55 super().__init__(f"{error.code}: {error.message}")
56 except Exception:
57 super().__init__(
58 f"Rekor returned an unknown error with HTTP {http_error.response.status_code}"
59 )
60 else:
61 super().__init__(f"Unexpected Rekor error: {http_error}")
62
63
64class RekorLogSubmitter(ABC):
65 """
66 Abstract class to represent a Rekor log entry submitter.
67
68 Intended to be implemented by RekorClient and RekorV2Client.
69 """
70
71 @abstractmethod
72 def create_entry(
73 self,
74 request: EntryRequestBody,
75 ) -> LogEntry:
76 """
77 Submit the request to Rekor.
78 """
79 pass
80
81 @classmethod
82 @abstractmethod
83 def _build_hashed_rekord_request(
84 self, hashed_input: Hashed, signature: bytes, certificate: Certificate
85 ) -> EntryRequestBody:
86 """
87 Construct a hashed rekord request to submit to Rekor.
88 """
89 pass
90
91 @classmethod
92 @abstractmethod
93 def _build_dsse_request(
94 self, envelope: Envelope, certificate: Certificate
95 ) -> EntryRequestBody:
96 """
97 Construct a dsse request to submit to Rekor.
98 """
99 pass
100
101
102# TODO: This should probably live somewhere better.
103def _hashedrekord_from_parts(
104 cert: Certificate, sig: bytes, hashed: Hashed
105) -> rekor_types.Hashedrekord:
106 return rekor_types.Hashedrekord(
107 spec=rekor_types.hashedrekord.HashedrekordV001Schema(
108 signature=rekor_types.hashedrekord.Signature(
109 content=base64.b64encode(sig).decode(),
110 public_key=rekor_types.hashedrekord.PublicKey(
111 content=base64_encode_pem_cert(cert),
112 ),
113 ),
114 data=rekor_types.hashedrekord.Data(
115 hash=rekor_types.hashedrekord.Hash(
116 algorithm=hashed._as_hashedrekord_algorithm(),
117 value=hashed.digest.hex(),
118 )
119 ),
120 )
121 )