1# Copyright 2022 The Sigstore Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Utilities for verifying signed certificate timestamps.
17"""
18
19import logging
20import struct
21from datetime import timezone
22
23from cryptography.hazmat.primitives import hashes, serialization
24from cryptography.hazmat.primitives.asymmetric import ec, rsa
25from cryptography.x509 import (
26 Certificate,
27 ExtendedKeyUsage,
28 ExtensionNotFound,
29 PrecertificateSignedCertificateTimestamps,
30)
31from cryptography.x509.certificate_transparency import (
32 LogEntryType,
33 SignedCertificateTimestamp,
34)
35from cryptography.x509.oid import ExtendedKeyUsageOID
36
37from sigstore._internal.trust import CTKeyring
38from sigstore._utils import (
39 KeyID,
40 cert_is_ca,
41 key_id,
42)
43from sigstore.errors import VerificationError
44
45_logger = logging.getLogger(__name__)
46
47
48def _pack_signed_entry(
49 sct: SignedCertificateTimestamp, cert: Certificate, issuer_key_id: bytes | None
50) -> bytes:
51 fields = []
52 if sct.entry_type == LogEntryType.X509_CERTIFICATE:
53 # When dealing with a "normal" certificate, our signed entry looks like this:
54 #
55 # [0]: opaque ASN.1Cert<1..2^24-1>
56 pack_format = "!BBB{cert_der_len}s"
57 cert_der = cert.public_bytes(encoding=serialization.Encoding.DER)
58 elif sct.entry_type == LogEntryType.PRE_CERTIFICATE:
59 if not issuer_key_id or len(issuer_key_id) != 32:
60 raise VerificationError("API misuse: issuer key ID missing")
61
62 # When dealing with a precertificate, our signed entry looks like this:
63 #
64 # [0]: issuer_key_id[32]
65 # [1]: opaque TBSCertificate<1..2^24-1>
66 pack_format = "!32sBBB{cert_der_len}s"
67
68 # Precertificates must have their SCT list extension filtered out.
69 cert_der = cert.tbs_precertificate_bytes
70 fields.append(issuer_key_id)
71 else:
72 raise VerificationError(f"unknown SCT log entry type: {sct.entry_type!r}")
73
74 # The `opaque` length is a u24, which isn't directly supported by `struct`.
75 # So we have to decompose it into 3 bytes.
76 unused, len1, len2, len3 = struct.unpack(
77 "!4B",
78 struct.pack("!I", len(cert_der)),
79 )
80 if unused:
81 raise VerificationError(
82 f"Unexpectedly large certificate length: {len(cert_der)}"
83 )
84
85 pack_format = pack_format.format(cert_der_len=len(cert_der))
86 fields.extend((len1, len2, len3, cert_der))
87
88 return struct.pack(pack_format, *fields)
89
90
91def _pack_digitally_signed(
92 sct: SignedCertificateTimestamp,
93 cert: Certificate,
94 issuer_key_id: KeyID | None,
95) -> bytes:
96 """
97 Packs the contents of `cert` (and some pieces of `sct`) into a structured
98 blob, one that forms the signature body of the "digitally-signed" struct
99 for an SCT.
100
101 The format of the digitally signed data is described in IETF's RFC 6962.
102 """
103
104 # This constructs the "core" `signed_entry` field, which is either
105 # the public bytes of the cert *or* the TBSPrecertificate (with some
106 # filtering), depending on whether our SCT is for a precertificate.
107 signed_entry = _pack_signed_entry(sct, cert, issuer_key_id)
108
109 # Assemble a format string with the certificate length baked in and then pack the digitally
110 # signed data
111 # fmt: off
112 pattern = f"!BBQH{len(signed_entry)}sH{len(sct.extension_bytes)}s"
113 timestamp = sct.timestamp.replace(tzinfo=timezone.utc)
114 data = struct.pack(
115 pattern,
116 sct.version.value, # sct_version
117 0, # signature_type (certificate_timestamp(0))
118 int(timestamp.timestamp() * 1000), # timestamp (milliseconds)
119 sct.entry_type.value, # entry_type (x509_entry(0) | precert_entry(1))
120 signed_entry, # select(entry_type) -> signed_entry (see above)
121 len(sct.extension_bytes), # extensions (opaque CtExtensions<0..2^16-1>)
122 sct.extension_bytes,
123 )
124 # fmt: on
125
126 return data
127
128
129def _is_preissuer(issuer: Certificate) -> bool:
130 try:
131 ext_key_usage = issuer.extensions.get_extension_for_class(ExtendedKeyUsage)
132 # If we do not have any EKU, we certainly do not have CT Ext
133 except ExtensionNotFound:
134 return False
135
136 return ExtendedKeyUsageOID.CERTIFICATE_TRANSPARENCY in ext_key_usage.value
137
138
139def _get_issuer_cert(chain: list[Certificate]) -> Certificate:
140 issuer = chain[0]
141 if _is_preissuer(issuer):
142 issuer = chain[1]
143 return issuer
144
145
146def _get_signed_certificate_timestamp(
147 certificate: Certificate,
148) -> SignedCertificateTimestamp:
149 """Retrieve the embedded SCT from the certificate.
150
151 Raise VerificationError if certificate does not contain exactly one SCT
152 """
153 try:
154 timestamps = certificate.extensions.get_extension_for_class(
155 PrecertificateSignedCertificateTimestamps
156 ).value
157 except ExtensionNotFound:
158 raise VerificationError(
159 "Certificate does not contain a signed certificate timestamp extension"
160 )
161
162 if len(timestamps) != 1:
163 raise VerificationError(
164 f"Expected one certificate timestamp, found {len(timestamps)}"
165 )
166 sct: SignedCertificateTimestamp = timestamps[0]
167 return sct
168
169
170def _cert_is_ca(cert: Certificate) -> bool:
171 _logger.debug(f"Found {cert.subject} as issuer, verifying if it is a ca")
172 try:
173 cert_is_ca(cert)
174 except VerificationError as e:
175 _logger.debug(f"Invalid {cert.subject}: failed to validate as a CA: {e}")
176 return False
177 return True
178
179
180def verify_sct(
181 cert: Certificate,
182 chain: list[Certificate],
183 ct_keyring: CTKeyring,
184) -> None:
185 """
186 Verify a signed certificate timestamp.
187
188 An SCT is verified by reconstructing its "digitally-signed" payload
189 and verifying that the signature provided in the SCT is valid against
190 one of the keys present in the CT keyring (i.e., the keys used by the CT
191 log to sign SCTs).
192 """
193
194 sct = _get_signed_certificate_timestamp(cert)
195
196 issuer_key_id = None
197 if sct.entry_type == LogEntryType.PRE_CERTIFICATE:
198 # If we're verifying an SCT for a precertificate, we need to
199 # find its issuer in the chain and calculate a hash over
200 # its public key information, as part of the "binding" proof
201 # that ties the issuer to the final certificate.
202 issuer_cert = _get_issuer_cert(chain)
203 issuer_pubkey = issuer_cert.public_key()
204
205 if not _cert_is_ca(issuer_cert):
206 raise VerificationError(
207 f"SCT verify: Invalid issuer pubkey basicConstraint (not a CA): {issuer_pubkey}"
208 )
209
210 if not isinstance(issuer_pubkey, rsa.RSAPublicKey | ec.EllipticCurvePublicKey):
211 raise VerificationError(
212 f"SCT verify: invalid issuer pubkey format (not ECDSA or RSA): {issuer_pubkey}"
213 )
214
215 issuer_key_id = key_id(issuer_pubkey)
216
217 digitally_signed = _pack_digitally_signed(sct, cert, issuer_key_id)
218
219 if not isinstance(sct.signature_hash_algorithm, hashes.SHA256):
220 raise VerificationError(
221 "Found unexpected hash algorithm in SCT: only SHA256 is supported "
222 f"(expected {hashes.SHA256}, got {sct.signature_hash_algorithm})"
223 )
224
225 try:
226 _logger.debug(f"attempting to verify SCT with key ID {sct.log_id.hex()}")
227 # NOTE(ww): In terms of the DER structure, the SCT's `LogID` contains a
228 # singular `opaque key_id[32]`. Cryptography's APIs don't bother
229 # to expose this trivial single member, so we use the `log_id`
230 # attribute directly.
231 ct_keyring.verify(
232 key_id=KeyID(sct.log_id), signature=sct.signature, data=digitally_signed
233 )
234 except VerificationError as exc:
235 raise VerificationError(f"SCT verify failed: {exc}")