1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Client implementation for interacting with Fulcio.
17"""
18
19from __future__ import annotations
20
21import base64
22import json
23import logging
24from abc import ABC
25from dataclasses import dataclass
26from urllib.parse import urljoin
27
28import requests
29from cryptography.hazmat.primitives import serialization
30from cryptography.x509 import (
31 Certificate,
32 CertificateSigningRequest,
33 load_pem_x509_certificate,
34)
35
36from sigstore._internal import USER_AGENT
37from sigstore._utils import B64Str
38from sigstore.oidc import IdentityToken
39
40_logger = logging.getLogger(__name__)
41
42DEFAULT_FULCIO_URL = "https://fulcio.sigstore.dev"
43STAGING_FULCIO_URL = "https://fulcio.sigstage.dev"
44SIGNING_CERT_ENDPOINT = "/api/v2/signingCert"
45TRUST_BUNDLE_ENDPOINT = "/api/v2/trustBundle"
46
47
48class ExpiredCertificate(Exception):
49 """An error raised when the Certificate is expired."""
50
51
52@dataclass(frozen=True)
53class FulcioCertificateSigningResponse:
54 """Certificate response"""
55
56 cert: Certificate
57 chain: list[Certificate]
58
59
60@dataclass(frozen=True)
61class FulcioTrustBundleResponse:
62 """Trust bundle response, containing a list of certificate chains"""
63
64 trust_bundle: list[list[Certificate]]
65
66
67class FulcioClientError(Exception):
68 """
69 Raised on any error in the Fulcio client.
70 """
71
72 pass
73
74
75class _Endpoint(ABC):
76 def __init__(self, url: str, session: requests.Session) -> None:
77 self.url = url
78 self.session = session
79
80
81def _serialize_cert_request(req: CertificateSigningRequest) -> str:
82 data = {
83 "certificateSigningRequest": B64Str(
84 base64.b64encode(req.public_bytes(serialization.Encoding.PEM)).decode()
85 )
86 }
87 return json.dumps(data)
88
89
90class FulcioSigningCert(_Endpoint):
91 """
92 Fulcio REST API signing certificate functionality.
93 """
94
95 def post(
96 self, req: CertificateSigningRequest, identity: IdentityToken
97 ) -> FulcioCertificateSigningResponse:
98 """
99 Get the signing certificate, using an X.509 Certificate
100 Signing Request.
101 """
102 headers = {
103 "Authorization": f"Bearer {identity}",
104 "Content-Type": "application/json",
105 "Accept": "application/pem-certificate-chain",
106 }
107 resp: requests.Response = self.session.post(
108 url=self.url, data=_serialize_cert_request(req), headers=headers
109 )
110 try:
111 resp.raise_for_status()
112 except requests.HTTPError as http_error:
113 # See if we can optionally add a message
114 if http_error.response:
115 text = json.loads(http_error.response.text)
116 if "message" in http_error.response.text:
117 raise FulcioClientError(text["message"]) from http_error
118 raise FulcioClientError from http_error
119
120 try:
121 certificates = resp.json()["signedCertificateEmbeddedSct"]["chain"][
122 "certificates"
123 ]
124 except KeyError:
125 raise FulcioClientError("Fulcio response missing certificate chain")
126
127 # Cryptography doesn't have chain verification/building built in
128 # https://github.com/pyca/cryptography/issues/2381
129 if len(certificates) < 2:
130 raise FulcioClientError(
131 f"Certificate chain is too short: {len(certificates)} < 2"
132 )
133 cert = load_pem_x509_certificate(certificates[0].encode())
134 chain = [load_pem_x509_certificate(c.encode()) for c in certificates[1:]]
135
136 return FulcioCertificateSigningResponse(cert, chain)
137
138
139class FulcioTrustBundle(_Endpoint):
140 """
141 Fulcio REST API trust bundle functionality.
142 """
143
144 def get(self) -> FulcioTrustBundleResponse:
145 """Get the certificate chains from Fulcio"""
146 resp: requests.Response = self.session.get(self.url)
147 try:
148 resp.raise_for_status()
149 except requests.HTTPError as http_error:
150 raise FulcioClientError from http_error
151
152 trust_bundle_json = resp.json()
153 chains: list[list[Certificate]] = []
154 for certificate_chain in trust_bundle_json["chains"]:
155 chain: list[Certificate] = []
156 for certificate in certificate_chain["certificates"]:
157 cert: Certificate = load_pem_x509_certificate(certificate.encode())
158 chain.append(cert)
159 chains.append(chain)
160 return FulcioTrustBundleResponse(chains)
161
162
163class FulcioClient:
164 """The internal Fulcio client"""
165
166 def __init__(self, url: str = DEFAULT_FULCIO_URL) -> None:
167 """Initialize the client"""
168 _logger.debug(f"Fulcio client using URL: {url}")
169 self.url = url
170 self.session = requests.Session()
171 self.session.headers.update(
172 {
173 "User-Agent": USER_AGENT,
174 }
175 )
176
177 def __del__(self) -> None:
178 """
179 Destroys the underlying network session.
180 """
181 self.session.close()
182
183 @classmethod
184 def production(cls) -> FulcioClient:
185 """
186 Returns a `FulcioClient` for the Sigstore production instance of Fulcio.
187 """
188 return cls(DEFAULT_FULCIO_URL)
189
190 @classmethod
191 def staging(cls) -> FulcioClient:
192 """
193 Returns a `FulcioClient` for the Sigstore staging instance of Fulcio.
194 """
195 return cls(STAGING_FULCIO_URL)
196
197 @property
198 def signing_cert(self) -> FulcioSigningCert:
199 """
200 Returns a model capable of interacting with Fulcio's signing certificate endpoints.
201 """
202 return FulcioSigningCert(
203 urljoin(self.url, SIGNING_CERT_ENDPOINT), session=self.session
204 )
205
206 @property
207 def trust_bundle(self) -> FulcioTrustBundle:
208 """
209 Returns a model capable of interacting with Fulcio's trust bundle endpoints.
210 """
211 return FulcioTrustBundle(
212 urljoin(self.url, TRUST_BUNDLE_ENDPOINT), session=self.session
213 )