1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Client implementation for interacting with Fulcio.
17"""
18
19from __future__ import annotations
20
21import base64
22import json
23import logging
24from abc import ABC
25from dataclasses import dataclass
26from urllib.parse import urljoin
27
28import requests
29from cryptography.hazmat.primitives import serialization
30from cryptography.x509 import (
31 Certificate,
32 CertificateSigningRequest,
33 load_pem_x509_certificate,
34)
35
36from sigstore._internal import USER_AGENT
37from sigstore._utils import B64Str
38from sigstore.oidc import IdentityToken
39
40_logger = logging.getLogger(__name__)
41
42SIGNING_CERT_ENDPOINT = "/api/v2/signingCert"
43TRUST_BUNDLE_ENDPOINT = "/api/v2/trustBundle"
44
45
46class ExpiredCertificate(Exception):
47 """An error raised when the Certificate is expired."""
48
49
50@dataclass(frozen=True)
51class FulcioCertificateSigningResponse:
52 """Certificate response"""
53
54 cert: Certificate
55 chain: list[Certificate]
56
57
58@dataclass(frozen=True)
59class FulcioTrustBundleResponse:
60 """Trust bundle response, containing a list of certificate chains"""
61
62 trust_bundle: list[list[Certificate]]
63
64
65class FulcioClientError(Exception):
66 """
67 Raised on any error in the Fulcio client.
68 """
69
70 pass
71
72
73class _Endpoint(ABC):
74 def __init__(self, url: str, session: requests.Session) -> None:
75 self.url = url
76 self.session = session
77
78
79def _serialize_cert_request(req: CertificateSigningRequest) -> str:
80 data = {
81 "certificateSigningRequest": B64Str(
82 base64.b64encode(req.public_bytes(serialization.Encoding.PEM)).decode()
83 )
84 }
85 return json.dumps(data)
86
87
88class FulcioSigningCert(_Endpoint):
89 """
90 Fulcio REST API signing certificate functionality.
91 """
92
93 def post(
94 self, req: CertificateSigningRequest, identity: IdentityToken
95 ) -> FulcioCertificateSigningResponse:
96 """
97 Get the signing certificate, using an X.509 Certificate
98 Signing Request.
99 """
100 headers = {
101 "Authorization": f"Bearer {identity}",
102 "Content-Type": "application/json",
103 "Accept": "application/pem-certificate-chain",
104 }
105 resp: requests.Response = self.session.post(
106 url=self.url, data=_serialize_cert_request(req), headers=headers
107 )
108 try:
109 resp.raise_for_status()
110 except requests.HTTPError as http_error:
111 # See if we can optionally add a message
112 if http_error.response:
113 text = json.loads(http_error.response.text)
114 if "message" in http_error.response.text:
115 raise FulcioClientError(text["message"]) from http_error
116 raise FulcioClientError from http_error
117
118 try:
119 certificates = resp.json()["signedCertificateEmbeddedSct"]["chain"][
120 "certificates"
121 ]
122 except KeyError:
123 raise FulcioClientError("Fulcio response missing certificate chain")
124
125 # Cryptography doesn't have chain verification/building built in
126 # https://github.com/pyca/cryptography/issues/2381
127 if len(certificates) < 2:
128 raise FulcioClientError(
129 f"Certificate chain is too short: {len(certificates)} < 2"
130 )
131 cert = load_pem_x509_certificate(certificates[0].encode())
132 chain = [load_pem_x509_certificate(c.encode()) for c in certificates[1:]]
133
134 return FulcioCertificateSigningResponse(cert, chain)
135
136
137class FulcioTrustBundle(_Endpoint):
138 """
139 Fulcio REST API trust bundle functionality.
140 """
141
142 def get(self) -> FulcioTrustBundleResponse:
143 """Get the certificate chains from Fulcio"""
144 resp: requests.Response = self.session.get(self.url)
145 try:
146 resp.raise_for_status()
147 except requests.HTTPError as http_error:
148 raise FulcioClientError from http_error
149
150 trust_bundle_json = resp.json()
151 chains: list[list[Certificate]] = []
152 for certificate_chain in trust_bundle_json["chains"]:
153 chain: list[Certificate] = []
154 for certificate in certificate_chain["certificates"]:
155 cert: Certificate = load_pem_x509_certificate(certificate.encode())
156 chain.append(cert)
157 chains.append(chain)
158 return FulcioTrustBundleResponse(chains)
159
160
161class FulcioClient:
162 """The internal Fulcio client"""
163
164 def __init__(self, url: str) -> None:
165 """Initialize the client"""
166 _logger.debug(f"Fulcio client using URL: {url}")
167 self.url = url
168 self.session = requests.Session()
169 self.session.headers.update(
170 {
171 "User-Agent": USER_AGENT,
172 }
173 )
174
175 def __del__(self) -> None:
176 """
177 Destroys the underlying network session.
178 """
179 self.session.close()
180
181 @property
182 def signing_cert(self) -> FulcioSigningCert:
183 """
184 Returns a model capable of interacting with Fulcio's signing certificate endpoints.
185 """
186 return FulcioSigningCert(
187 urljoin(self.url, SIGNING_CERT_ENDPOINT), session=self.session
188 )
189
190 @property
191 def trust_bundle(self) -> FulcioTrustBundle:
192 """
193 Returns a model capable of interacting with Fulcio's trust bundle endpoints.
194 """
195 return FulcioTrustBundle(
196 urljoin(self.url, TRUST_BUNDLE_ENDPOINT), session=self.session
197 )