Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sigstore/_internal/sct.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

85 statements  

1# Copyright 2022 The Sigstore Authors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" 

16Utilities for verifying signed certificate timestamps. 

17""" 

18 

19import logging 

20import struct 

21from datetime import timezone 

22 

23from cryptography.hazmat.primitives import hashes, serialization 

24from cryptography.hazmat.primitives.asymmetric import ec, rsa 

25from cryptography.x509 import ( 

26 Certificate, 

27 ExtendedKeyUsage, 

28 ExtensionNotFound, 

29 PrecertificateSignedCertificateTimestamps, 

30) 

31from cryptography.x509.certificate_transparency import ( 

32 LogEntryType, 

33 SignedCertificateTimestamp, 

34) 

35from cryptography.x509.oid import ExtendedKeyUsageOID 

36 

37from sigstore._internal.trust import CTKeyring 

38from sigstore._utils import ( 

39 KeyID, 

40 cert_is_ca, 

41 key_id, 

42) 

43from sigstore.errors import VerificationError 

44 

45_logger = logging.getLogger(__name__) 

46 

47 

48def _pack_signed_entry( 

49 sct: SignedCertificateTimestamp, cert: Certificate, issuer_key_id: bytes | None 

50) -> bytes: 

51 fields = [] 

52 if sct.entry_type == LogEntryType.X509_CERTIFICATE: 

53 # When dealing with a "normal" certificate, our signed entry looks like this: 

54 # 

55 # [0]: opaque ASN.1Cert<1..2^24-1> 

56 pack_format = "!BBB{cert_der_len}s" 

57 cert_der = cert.public_bytes(encoding=serialization.Encoding.DER) 

58 elif sct.entry_type == LogEntryType.PRE_CERTIFICATE: 

59 if not issuer_key_id or len(issuer_key_id) != 32: 

60 raise VerificationError("API misuse: issuer key ID missing") 

61 

62 # When dealing with a precertificate, our signed entry looks like this: 

63 # 

64 # [0]: issuer_key_id[32] 

65 # [1]: opaque TBSCertificate<1..2^24-1> 

66 pack_format = "!32sBBB{cert_der_len}s" 

67 

68 # Precertificates must have their SCT list extension filtered out. 

69 cert_der = cert.tbs_precertificate_bytes 

70 fields.append(issuer_key_id) 

71 else: 

72 raise VerificationError(f"unknown SCT log entry type: {sct.entry_type!r}") 

73 

74 # The `opaque` length is a u24, which isn't directly supported by `struct`. 

75 # So we have to decompose it into 3 bytes. 

76 unused, len1, len2, len3 = struct.unpack( 

77 "!4B", 

78 struct.pack("!I", len(cert_der)), 

79 ) 

80 if unused: 

81 raise VerificationError( 

82 f"Unexpectedly large certificate length: {len(cert_der)}" 

83 ) 

84 

85 pack_format = pack_format.format(cert_der_len=len(cert_der)) 

86 fields.extend((len1, len2, len3, cert_der)) 

87 

88 return struct.pack(pack_format, *fields) 

89 

90 

91def _pack_digitally_signed( 

92 sct: SignedCertificateTimestamp, 

93 cert: Certificate, 

94 issuer_key_id: KeyID | None, 

95) -> bytes: 

96 """ 

97 Packs the contents of `cert` (and some pieces of `sct`) into a structured 

98 blob, one that forms the signature body of the "digitally-signed" struct 

99 for an SCT. 

100 

101 The format of the digitally signed data is described in IETF's RFC 6962. 

102 """ 

103 

104 # This constructs the "core" `signed_entry` field, which is either 

105 # the public bytes of the cert *or* the TBSPrecertificate (with some 

106 # filtering), depending on whether our SCT is for a precertificate. 

107 signed_entry = _pack_signed_entry(sct, cert, issuer_key_id) 

108 

109 # Assemble a format string with the certificate length baked in and then pack the digitally 

110 # signed data 

111 # fmt: off 

112 pattern = f"!BBQH{len(signed_entry)}sH{len(sct.extension_bytes)}s" 

113 timestamp = sct.timestamp.replace(tzinfo=timezone.utc) 

114 data = struct.pack( 

115 pattern, 

116 sct.version.value, # sct_version 

117 0, # signature_type (certificate_timestamp(0)) 

118 int(timestamp.timestamp() * 1000), # timestamp (milliseconds) 

119 sct.entry_type.value, # entry_type (x509_entry(0) | precert_entry(1)) 

120 signed_entry, # select(entry_type) -> signed_entry (see above) 

121 len(sct.extension_bytes), # extensions (opaque CtExtensions<0..2^16-1>) 

122 sct.extension_bytes, 

123 ) 

124 # fmt: on 

125 

126 return data 

127 

128 

129def _is_preissuer(issuer: Certificate) -> bool: 

130 try: 

131 ext_key_usage = issuer.extensions.get_extension_for_class(ExtendedKeyUsage) 

132 # If we do not have any EKU, we certainly do not have CT Ext 

133 except ExtensionNotFound: 

134 return False 

135 

136 return ExtendedKeyUsageOID.CERTIFICATE_TRANSPARENCY in ext_key_usage.value 

137 

138 

139def _get_issuer_cert(chain: list[Certificate]) -> Certificate: 

140 issuer = chain[0] 

141 if _is_preissuer(issuer): 

142 issuer = chain[1] 

143 return issuer 

144 

145 

146def _get_signed_certificate_timestamp( 

147 certificate: Certificate, 

148) -> SignedCertificateTimestamp: 

149 """Retrieve the embedded SCT from the certificate. 

150 

151 Raise VerificationError if certificate does not contain exactly one SCT 

152 """ 

153 try: 

154 timestamps = certificate.extensions.get_extension_for_class( 

155 PrecertificateSignedCertificateTimestamps 

156 ).value 

157 except ExtensionNotFound: 

158 raise VerificationError( 

159 "Certificate does not contain a signed certificate timestamp extension" 

160 ) 

161 

162 if len(timestamps) != 1: 

163 raise VerificationError( 

164 f"Expected one certificate timestamp, found {len(timestamps)}" 

165 ) 

166 sct: SignedCertificateTimestamp = timestamps[0] 

167 return sct 

168 

169 

170def _cert_is_ca(cert: Certificate) -> bool: 

171 _logger.debug(f"Found {cert.subject} as issuer, verifying if it is a ca") 

172 try: 

173 cert_is_ca(cert) 

174 except VerificationError as e: 

175 _logger.debug(f"Invalid {cert.subject}: failed to validate as a CA: {e}") 

176 return False 

177 return True 

178 

179 

180def verify_sct( 

181 cert: Certificate, 

182 chain: list[Certificate], 

183 ct_keyring: CTKeyring, 

184) -> None: 

185 """ 

186 Verify a signed certificate timestamp. 

187 

188 An SCT is verified by reconstructing its "digitally-signed" payload 

189 and verifying that the signature provided in the SCT is valid against 

190 one of the keys present in the CT keyring (i.e., the keys used by the CT 

191 log to sign SCTs). 

192 """ 

193 

194 sct = _get_signed_certificate_timestamp(cert) 

195 

196 issuer_key_id = None 

197 if sct.entry_type == LogEntryType.PRE_CERTIFICATE: 

198 # If we're verifying an SCT for a precertificate, we need to 

199 # find its issuer in the chain and calculate a hash over 

200 # its public key information, as part of the "binding" proof 

201 # that ties the issuer to the final certificate. 

202 issuer_cert = _get_issuer_cert(chain) 

203 issuer_pubkey = issuer_cert.public_key() 

204 

205 if not _cert_is_ca(issuer_cert): 

206 raise VerificationError( 

207 f"SCT verify: Invalid issuer pubkey basicConstraint (not a CA): {issuer_pubkey}" 

208 ) 

209 

210 if not isinstance(issuer_pubkey, rsa.RSAPublicKey | ec.EllipticCurvePublicKey): 

211 raise VerificationError( 

212 f"SCT verify: invalid issuer pubkey format (not ECDSA or RSA): {issuer_pubkey}" 

213 ) 

214 

215 issuer_key_id = key_id(issuer_pubkey) 

216 

217 digitally_signed = _pack_digitally_signed(sct, cert, issuer_key_id) 

218 

219 if not isinstance(sct.signature_hash_algorithm, hashes.SHA256): 

220 raise VerificationError( 

221 "Found unexpected hash algorithm in SCT: only SHA256 is supported " 

222 f"(expected {hashes.SHA256}, got {sct.signature_hash_algorithm})" 

223 ) 

224 

225 try: 

226 _logger.debug(f"attempting to verify SCT with key ID {sct.log_id.hex()}") 

227 # NOTE(ww): In terms of the DER structure, the SCT's `LogID` contains a 

228 # singular `opaque key_id[32]`. Cryptography's APIs don't bother 

229 # to expose this trivial single member, so we use the `log_id` 

230 # attribute directly. 

231 ct_keyring.verify( 

232 key_id=KeyID(sct.log_id), signature=sct.signature, data=digitally_signed 

233 ) 

234 except VerificationError as exc: 

235 raise VerificationError(f"SCT verify failed: {exc}")