Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/fulcio/client.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

86 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Client implementation for interacting with Fulcio. 

17""" 

18 

19from __future__ import annotations 

20 

21import base64 

22import json 

23import logging 

24from abc import ABC 

25from dataclasses import dataclass 

26from urllib.parse import urljoin 

27 

28import requests 

29from cryptography.hazmat.primitives import serialization 

30from cryptography.x509 import ( 

31 Certificate, 

32 CertificateSigningRequest, 

33 load_pem_x509_certificate, 

34) 

35 

36from sigstore._internal import USER_AGENT 

37from sigstore._utils import B64Str 

38from sigstore.oidc import IdentityToken 

39 

40_logger = logging.getLogger(__name__) 

41 

42SIGNING_CERT_ENDPOINT = "/api/v2/signingCert" 

43TRUST_BUNDLE_ENDPOINT = "/api/v2/trustBundle" 

44 

45 

46class ExpiredCertificate(Exception): 

47 """An error raised when the Certificate is expired.""" 

48 

49 

50@dataclass(frozen=True) 

51class FulcioCertificateSigningResponse: 

52 """Certificate response""" 

53 

54 cert: Certificate 

55 chain: list[Certificate] 

56 

57 

58@dataclass(frozen=True) 

59class FulcioTrustBundleResponse: 

60 """Trust bundle response, containing a list of certificate chains""" 

61 

62 trust_bundle: list[list[Certificate]] 

63 

64 

65class FulcioClientError(Exception): 

66 """ 

67 Raised on any error in the Fulcio client. 

68 """ 

69 

70 pass 

71 

72 

73class _Endpoint(ABC): 

74 def __init__(self, url: str, session: requests.Session) -> None: 

75 self.url = url 

76 self.session = session 

77 

78 

79def _serialize_cert_request(req: CertificateSigningRequest) -> str: 

80 data = { 

81 "certificateSigningRequest": B64Str( 

82 base64.b64encode(req.public_bytes(serialization.Encoding.PEM)).decode() 

83 ) 

84 } 

85 return json.dumps(data) 

86 

87 

88class FulcioSigningCert(_Endpoint): 

89 """ 

90 Fulcio REST API signing certificate functionality. 

91 """ 

92 

93 def post( 

94 self, req: CertificateSigningRequest, identity: IdentityToken 

95 ) -> FulcioCertificateSigningResponse: 

96 """ 

97 Get the signing certificate, using an X.509 Certificate 

98 Signing Request. 

99 """ 

100 headers = { 

101 "Authorization": f"Bearer {identity}", 

102 "Content-Type": "application/json", 

103 "Accept": "application/pem-certificate-chain", 

104 } 

105 resp: requests.Response = self.session.post( 

106 url=self.url, data=_serialize_cert_request(req), headers=headers 

107 ) 

108 try: 

109 resp.raise_for_status() 

110 except requests.HTTPError as http_error: 

111 # See if we can optionally add a message 

112 if http_error.response: 

113 text = json.loads(http_error.response.text) 

114 if "message" in http_error.response.text: 

115 raise FulcioClientError(text["message"]) from http_error 

116 raise FulcioClientError from http_error 

117 

118 try: 

119 certificates = resp.json()["signedCertificateEmbeddedSct"]["chain"][ 

120 "certificates" 

121 ] 

122 except KeyError: 

123 raise FulcioClientError("Fulcio response missing certificate chain") 

124 

125 # Cryptography doesn't have chain verification/building built in 

126 # https://github.com/pyca/cryptography/issues/2381 

127 if len(certificates) < 2: 

128 raise FulcioClientError( 

129 f"Certificate chain is too short: {len(certificates)} < 2" 

130 ) 

131 cert = load_pem_x509_certificate(certificates[0].encode()) 

132 chain = [load_pem_x509_certificate(c.encode()) for c in certificates[1:]] 

133 

134 return FulcioCertificateSigningResponse(cert, chain) 

135 

136 

137class FulcioTrustBundle(_Endpoint): 

138 """ 

139 Fulcio REST API trust bundle functionality. 

140 """ 

141 

142 def get(self) -> FulcioTrustBundleResponse: 

143 """Get the certificate chains from Fulcio""" 

144 resp: requests.Response = self.session.get(self.url) 

145 try: 

146 resp.raise_for_status() 

147 except requests.HTTPError as http_error: 

148 raise FulcioClientError from http_error 

149 

150 trust_bundle_json = resp.json() 

151 chains: list[list[Certificate]] = [] 

152 for certificate_chain in trust_bundle_json["chains"]: 

153 chain: list[Certificate] = [] 

154 for certificate in certificate_chain["certificates"]: 

155 cert: Certificate = load_pem_x509_certificate(certificate.encode()) 

156 chain.append(cert) 

157 chains.append(chain) 

158 return FulcioTrustBundleResponse(chains) 

159 

160 

161class FulcioClient: 

162 """The internal Fulcio client""" 

163 

164 def __init__(self, url: str) -> None: 

165 """Initialize the client""" 

166 _logger.debug(f"Fulcio client using URL: {url}") 

167 self.url = url 

168 self.session = requests.Session() 

169 self.session.headers.update( 

170 { 

171 "User-Agent": USER_AGENT, 

172 } 

173 ) 

174 

175 def __del__(self) -> None: 

176 """ 

177 Destroys the underlying network session. 

178 """ 

179 self.session.close() 

180 

181 @property 

182 def signing_cert(self) -> FulcioSigningCert: 

183 """ 

184 Returns a model capable of interacting with Fulcio's signing certificate endpoints. 

185 """ 

186 return FulcioSigningCert( 

187 urljoin(self.url, SIGNING_CERT_ENDPOINT), session=self.session 

188 ) 

189 

190 @property 

191 def trust_bundle(self) -> FulcioTrustBundle: 

192 """ 

193 Returns a model capable of interacting with Fulcio's trust bundle endpoints. 

194 """ 

195 return FulcioTrustBundle( 

196 urljoin(self.url, TRUST_BUNDLE_ENDPOINT), session=self.session 

197 )