Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/fulcio/client.py: 50%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

94 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Client implementation for interacting with Fulcio. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import json 

23import logging 

24from abc import ABC 

25from dataclasses import dataclass 

26from urllib.parse import urljoin 

27 

28import requests 

29from cryptography.hazmat.primitives import serialization 

30from cryptography.x509 import ( 

31 Certificate, 

32 CertificateSigningRequest, 

33 load_pem_x509_certificate, 

34) 

35 

36from sigstore._internal import USER_AGENT 

37from sigstore._utils import B64Str 

38from sigstore.oidc import IdentityToken 

39 

40_logger = logging.getLogger(__name__) 

41 

42DEFAULT_FULCIO_URL = "https://fulcio.sigstore.dev" 

43STAGING_FULCIO_URL = "https://fulcio.sigstage.dev" 

44SIGNING_CERT_ENDPOINT = "/api/v2/signingCert" 

45TRUST_BUNDLE_ENDPOINT = "/api/v2/trustBundle" 

46 

47 

48class ExpiredCertificate(Exception): 

49 """An error raised when the Certificate is expired.""" 

50 

51 

52@dataclass(frozen=True) 

53class FulcioCertificateSigningResponse: 

54 """Certificate response""" 

55 

56 cert: Certificate 

57 chain: list[Certificate] 

58 

59 

60@dataclass(frozen=True) 

61class FulcioTrustBundleResponse: 

62 """Trust bundle response, containing a list of certificate chains""" 

63 

64 trust_bundle: list[list[Certificate]] 

65 

66 

67class FulcioClientError(Exception): 

68 """ 

69 Raised on any error in the Fulcio client. 

70 """ 

71 

72 pass 

73 

74 

75class _Endpoint(ABC): 

76 def __init__(self, url: str, session: requests.Session) -> None: 

77 self.url = url 

78 self.session = session 

79 

80 

81def _serialize_cert_request(req: CertificateSigningRequest) -> str: 

82 data = { 

83 "certificateSigningRequest": B64Str( 

84 base64.b64encode(req.public_bytes(serialization.Encoding.PEM)).decode() 

85 ) 

86 } 

87 return json.dumps(data) 

88 

89 

90class FulcioSigningCert(_Endpoint): 

91 """ 

92 Fulcio REST API signing certificate functionality. 

93 """ 

94 

95 def post( 

96 self, req: CertificateSigningRequest, identity: IdentityToken 

97 ) -> FulcioCertificateSigningResponse: 

98 """ 

99 Get the signing certificate, using an X.509 Certificate 

100 Signing Request. 

101 """ 

102 headers = { 

103 "Authorization": f"Bearer {identity}", 

104 "Content-Type": "application/json", 

105 "Accept": "application/pem-certificate-chain", 

106 } 

107 resp: requests.Response = self.session.post( 

108 url=self.url, data=_serialize_cert_request(req), headers=headers 

109 ) 

110 try: 

111 resp.raise_for_status() 

112 except requests.HTTPError as http_error: 

113 # See if we can optionally add a message 

114 if http_error.response: 

115 text = json.loads(http_error.response.text) 

116 if "message" in http_error.response.text: 

117 raise FulcioClientError(text["message"]) from http_error 

118 raise FulcioClientError from http_error 

119 

120 try: 

121 certificates = resp.json()["signedCertificateEmbeddedSct"]["chain"][ 

122 "certificates" 

123 ] 

124 except KeyError: 

125 raise FulcioClientError("Fulcio response missing certificate chain") 

126 

127 # Cryptography doesn't have chain verification/building built in 

128 # https://github.com/pyca/cryptography/issues/2381 

129 if len(certificates) < 2: 

130 raise FulcioClientError( 

131 f"Certificate chain is too short: {len(certificates)} < 2" 

132 ) 

133 cert = load_pem_x509_certificate(certificates[0].encode()) 

134 chain = [load_pem_x509_certificate(c.encode()) for c in certificates[1:]] 

135 

136 return FulcioCertificateSigningResponse(cert, chain) 

137 

138 

139class FulcioTrustBundle(_Endpoint): 

140 """ 

141 Fulcio REST API trust bundle functionality. 

142 """ 

143 

144 def get(self) -> FulcioTrustBundleResponse: 

145 """Get the certificate chains from Fulcio""" 

146 resp: requests.Response = self.session.get(self.url) 

147 try: 

148 resp.raise_for_status() 

149 except requests.HTTPError as http_error: 

150 raise FulcioClientError from http_error 

151 

152 trust_bundle_json = resp.json() 

153 chains: list[list[Certificate]] = [] 

154 for certificate_chain in trust_bundle_json["chains"]: 

155 chain: list[Certificate] = [] 

156 for certificate in certificate_chain["certificates"]: 

157 cert: Certificate = load_pem_x509_certificate(certificate.encode()) 

158 chain.append(cert) 

159 chains.append(chain) 

160 return FulcioTrustBundleResponse(chains) 

161 

162 

163class FulcioClient: 

164 """The internal Fulcio client""" 

165 

166 def __init__(self, url: str = DEFAULT_FULCIO_URL) -> None: 

167 """Initialize the client""" 

168 _logger.debug(f"Fulcio client using URL: {url}") 

169 self.url = url 

170 self.session = requests.Session() 

171 self.session.headers.update( 

172 { 

173 "User-Agent": USER_AGENT, 

174 } 

175 ) 

176 

177 def __del__(self) -> None: 

178 """ 

179 Destroys the underlying network session. 

180 """ 

181 self.session.close() 

182 

183 @classmethod 

184 def production(cls) -> FulcioClient: 

185 """ 

186 Returns a `FulcioClient` for the Sigstore production instance of Fulcio. 

187 """ 

188 return cls(DEFAULT_FULCIO_URL) 

189 

190 @classmethod 

191 def staging(cls) -> FulcioClient: 

192 """ 

193 Returns a `FulcioClient` for the Sigstore staging instance of Fulcio. 

194 """ 

195 return cls(STAGING_FULCIO_URL) 

196 

197 @property 

198 def signing_cert(self) -> FulcioSigningCert: 

199 """ 

200 Returns a model capable of interacting with Fulcio's signing certificate endpoints. 

201 """ 

202 return FulcioSigningCert( 

203 urljoin(self.url, SIGNING_CERT_ENDPOINT), session=self.session 

204 ) 

205 

206 @property 

207 def trust_bundle(self) -> FulcioTrustBundle: 

208 """ 

209 Returns a model capable of interacting with Fulcio's trust bundle endpoints. 

210 """ 

211 return FulcioTrustBundle( 

212 urljoin(self.url, TRUST_BUNDLE_ENDPOINT), session=self.session 

213 )