Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/sct.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

88 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Utilities for verifying signed certificate timestamps. 

17""" 

18 

19import logging 

20import struct 

21from datetime import timezone 

22from typing import Optional 

23 

24from cryptography.hazmat.primitives import hashes, serialization 

25from cryptography.hazmat.primitives.asymmetric import ec, rsa 

26from cryptography.x509 import ( 

27 Certificate, 

28 ExtendedKeyUsage, 

29 ExtensionNotFound, 

30 PrecertificateSignedCertificateTimestamps, 

31) 

32from cryptography.x509.certificate_transparency import ( 

33 LogEntryType, 

34 SignedCertificateTimestamp, 

35) 

36from cryptography.x509.oid import ExtendedKeyUsageOID 

37 

38from sigstore._internal.trust import CTKeyring 

39from sigstore._utils import ( 

40 KeyID, 

41 cert_is_ca, 

42 key_id, 

43) 

44from sigstore.errors import VerificationError 

45 

46_logger = logging.getLogger(__name__) 

47 

48 

49def _pack_signed_entry( 

50 sct: SignedCertificateTimestamp, cert: Certificate, issuer_key_id: Optional[bytes] 

51) -> bytes: 

52 fields = [] 

53 if sct.entry_type == LogEntryType.X509_CERTIFICATE: 

54 # When dealing with a "normal" certificate, our signed entry looks like this: 

55 # 

56 # [0]: opaque ASN.1Cert<1..2^24-1> 

57 pack_format = "!BBB{cert_der_len}s" 

58 cert_der = cert.public_bytes(encoding=serialization.Encoding.DER) 

59 elif sct.entry_type == LogEntryType.PRE_CERTIFICATE: 

60 if not issuer_key_id or len(issuer_key_id) != 32: 

61 raise VerificationError("API misuse: issuer key ID missing") 

62 

63 # When dealing with a precertificate, our signed entry looks like this: 

64 # 

65 # [0]: issuer_key_id[32] 

66 # [1]: opaque TBSCertificate<1..2^24-1> 

67 pack_format = "!32sBBB{cert_der_len}s" 

68 

69 # Precertificates must have their SCT list extension filtered out. 

70 cert_der = cert.tbs_precertificate_bytes 

71 fields.append(issuer_key_id) 

72 else: 

73 raise VerificationError(f"unknown SCT log entry type: {sct.entry_type!r}") 

74 

75 # The `opaque` length is a u24, which isn't directly supported by `struct`. 

76 # So we have to decompose it into 3 bytes. 

77 unused, len1, len2, len3 = struct.unpack( 

78 "!4B", 

79 struct.pack("!I", len(cert_der)), 

80 ) 

81 if unused: 

82 raise VerificationError( 

83 f"Unexpectedly large certificate length: {len(cert_der)}" 

84 ) 

85 

86 pack_format = pack_format.format(cert_der_len=len(cert_der)) 

87 fields.extend((len1, len2, len3, cert_der)) 

88 

89 return struct.pack(pack_format, *fields) 

90 

91 

92def _pack_digitally_signed( 

93 sct: SignedCertificateTimestamp, 

94 cert: Certificate, 

95 issuer_key_id: Optional[KeyID], 

96) -> bytes: 

97 """ 

98 Packs the contents of `cert` (and some pieces of `sct`) into a structured 

99 blob, one that forms the signature body of the "digitally-signed" struct 

100 for an SCT. 

101 

102 The format of the digitally signed data is described in IETF's RFC 6962. 

103 """ 

104 

105 # No extensions are currently specified, so we treat the presence 

106 # of any extension bytes as suspicious. 

107 if len(sct.extension_bytes) != 0: 

108 raise VerificationError("Unexpected trailing extension bytes") 

109 

110 # This constructs the "core" `signed_entry` field, which is either 

111 # the public bytes of the cert *or* the TBSPrecertificate (with some 

112 # filtering), depending on whether our SCT is for a precertificate. 

113 signed_entry = _pack_signed_entry(sct, cert, issuer_key_id) 

114 

115 # Assemble a format string with the certificate length baked in and then pack the digitally 

116 # signed data 

117 # fmt: off 

118 pattern = f"!BBQH{len(signed_entry)}sH" 

119 timestamp = sct.timestamp.replace(tzinfo=timezone.utc) 

120 data = struct.pack( 

121 pattern, 

122 sct.version.value, # sct_version 

123 0, # signature_type (certificate_timestamp(0)) 

124 int(timestamp.timestamp() * 1000), # timestamp (milliseconds) 

125 sct.entry_type.value, # entry_type (x509_entry(0) | precert_entry(1)) 

126 signed_entry, # select(entry_type) -> signed_entry (see above) 

127 len(sct.extension_bytes), # extensions (opaque CtExtensions<0..2^16-1>) 

128 ) 

129 # fmt: on 

130 

131 return data 

132 

133 

134def _is_preissuer(issuer: Certificate) -> bool: 

135 try: 

136 ext_key_usage = issuer.extensions.get_extension_for_class(ExtendedKeyUsage) 

137 # If we do not have any EKU, we certainly do not have CT Ext 

138 except ExtensionNotFound: 

139 return False 

140 

141 return ExtendedKeyUsageOID.CERTIFICATE_TRANSPARENCY in ext_key_usage.value 

142 

143 

144def _get_issuer_cert(chain: list[Certificate]) -> Certificate: 

145 issuer = chain[0] 

146 if _is_preissuer(issuer): 

147 issuer = chain[1] 

148 return issuer 

149 

150 

151def _get_signed_certificate_timestamp( 

152 certificate: Certificate, 

153) -> SignedCertificateTimestamp: 

154 """Retrieve the embedded SCT from the certificate. 

155 

156 Raise VerificationError if certificate does not contain exactly one SCT 

157 """ 

158 try: 

159 timestamps = certificate.extensions.get_extension_for_class( 

160 PrecertificateSignedCertificateTimestamps 

161 ).value 

162 except ExtensionNotFound: 

163 raise VerificationError( 

164 "Certificate does not contain a signed certificate timestamp extension" 

165 ) 

166 

167 if len(timestamps) != 1: 

168 raise VerificationError( 

169 f"Expected one certificate timestamp, found {len(timestamps)}" 

170 ) 

171 sct: SignedCertificateTimestamp = timestamps[0] 

172 return sct 

173 

174 

175def _cert_is_ca(cert: Certificate) -> bool: 

176 _logger.debug(f"Found {cert.subject} as issuer, verifying if it is a ca") 

177 try: 

178 cert_is_ca(cert) 

179 except VerificationError as e: 

180 _logger.debug(f"Invalid {cert.subject}: failed to validate as a CA: {e}") 

181 return False 

182 return True 

183 

184 

185def verify_sct( 

186 cert: Certificate, 

187 chain: list[Certificate], 

188 ct_keyring: CTKeyring, 

189) -> None: 

190 """ 

191 Verify a signed certificate timestamp. 

192 

193 An SCT is verified by reconstructing its "digitally-signed" payload 

194 and verifying that the signature provided in the SCT is valid against 

195 one of the keys present in the CT keyring (i.e., the keys used by the CT 

196 log to sign SCTs). 

197 """ 

198 

199 sct = _get_signed_certificate_timestamp(cert) 

200 

201 issuer_key_id = None 

202 if sct.entry_type == LogEntryType.PRE_CERTIFICATE: 

203 # If we're verifying an SCT for a precertificate, we need to 

204 # find its issuer in the chain and calculate a hash over 

205 # its public key information, as part of the "binding" proof 

206 # that ties the issuer to the final certificate. 

207 issuer_cert = _get_issuer_cert(chain) 

208 issuer_pubkey = issuer_cert.public_key() 

209 

210 if not _cert_is_ca(issuer_cert): 

211 raise VerificationError( 

212 f"SCT verify: Invalid issuer pubkey basicConstraint (not a CA): {issuer_pubkey}" 

213 ) 

214 

215 if not isinstance(issuer_pubkey, (rsa.RSAPublicKey, ec.EllipticCurvePublicKey)): 

216 raise VerificationError( 

217 f"SCT verify: invalid issuer pubkey format (not ECDSA or RSA): {issuer_pubkey}" 

218 ) 

219 

220 issuer_key_id = key_id(issuer_pubkey) 

221 

222 digitally_signed = _pack_digitally_signed(sct, cert, issuer_key_id) 

223 

224 if not isinstance(sct.signature_hash_algorithm, hashes.SHA256): 

225 raise VerificationError( 

226 "Found unexpected hash algorithm in SCT: only SHA256 is supported " 

227 f"(expected {hashes.SHA256}, got {sct.signature_hash_algorithm})" 

228 ) 

229 

230 try: 

231 _logger.debug(f"attempting to verify SCT with key ID {sct.log_id.hex()}") 

232 # NOTE(ww): In terms of the DER structure, the SCT's `LogID` contains a 

233 # singular `opaque key_id[32]`. Cryptography's APIs don't bother 

234 # to expose this trivial single member, so we use the `log_id` 

235 # attribute directly. 

236 ct_keyring.verify( 

237 key_id=KeyID(sct.log_id), signature=sct.signature, data=digitally_signed 

238 ) 

239 except VerificationError as exc: 

240 raise VerificationError(f"SCT verify failed: {exc}")