Coverage for /pythoncovmergedfiles/medio/medio/src/model-transparency/tests/fuzzing/fuzz_with_cert_chain.py: 70%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

147 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12# Copyright 2025 The Sigstore Authors 

13# 

14# Licensed under the Apache License, Version 2.0 (the "License"); 

15# you may not use this file except in compliance with the License. 

16# You may obtain a copy of the License at 

17# 

18# http://www.apache.org/licenses/LICENSE-2.0 

19# 

20# Unless required by applicable law or agreed to in writing, software 

21# distributed under the License is distributed on an "AS IS" BASIS, 

22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

23# See the License for the specific language governing permissions and 

24# limitations under the License. 

25 

26import datetime as dt 

27from pathlib import Path 

28import shutil 

29import sys 

30import tempfile 

31 

32import atheris # type: ignore 

33from cryptography import x509 

34from cryptography.exceptions import UnsupportedAlgorithm 

35from cryptography.hazmat.primitives import hashes 

36from cryptography.hazmat.primitives import serialization 

37from cryptography.hazmat.primitives.asymmetric import ec 

38from cryptography.hazmat.primitives.asymmetric import rsa 

39from cryptography.x509.oid import ExtendedKeyUsageOID 

40from cryptography.x509.oid import NameOID 

41from utils import create_fuzz_files 

42 

43from model_signing import hashing 

44from model_signing import signing 

45from model_signing import verifying 

46 

47 

48def _rand_utf8( 

49 fdp: atheris.FuzzedDataProvider, min_len: int = 1, max_len: int = 32 

50) -> str: 

51 n = fdp.ConsumeIntInRange(min_len, max_len) 

52 data = fdp.ConsumeBytes(n) 

53 if not data: 

54 return "x" 

55 s = "".join(chr(32 + (c % 95)) for c in data).strip() 

56 return s or "x" 

57 

58 

59def gen_private_key(fdp: atheris.FuzzedDataProvider): 

60 """Generate RSA or EC private key using fuzz data (for CAs).""" 

61 if fdp.ConsumeBool(): 

62 curve = fdp.PickValueInList( 

63 [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()] 

64 ) 

65 return ec.generate_private_key(curve) 

66 key_size = fdp.PickValueInList([1024, 2048]) 

67 return rsa.generate_private_key(public_exponent=65537, key_size=key_size) 

68 

69 

70def gen_ec_key(fdp: atheris.FuzzedDataProvider): 

71 """Generate an EC private key (for the leaf).""" 

72 curve = fdp.PickValueInList( 

73 [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()] 

74 ) 

75 return ec.generate_private_key(curve) 

76 

77 

78def gen_name(fdp: atheris.FuzzedDataProvider) -> x509.Name: 

79 attrs = [x509.NameAttribute(NameOID.COMMON_NAME, _rand_utf8(fdp, 3, 40))] 

80 if fdp.ConsumeBool(): 

81 attrs.append( 

82 x509.NameAttribute( 

83 NameOID.ORGANIZATION_NAME, _rand_utf8(fdp, 2, 20) 

84 ) 

85 ) 

86 if fdp.ConsumeBool(): 

87 attrs.append( 

88 x509.NameAttribute( 

89 NameOID.ORGANIZATIONAL_UNIT_NAME, _rand_utf8(fdp, 2, 20) 

90 ) 

91 ) 

92 if fdp.ConsumeBool(): 

93 cb = fdp.ConsumeBytes(2) or b"US" 

94 country = "".join(chr(ord("A") + (b % 26)) for b in cb) 

95 attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country)) 

96 if fdp.ConsumeBool(): 

97 attrs.append( 

98 x509.NameAttribute(NameOID.LOCALITY_NAME, _rand_utf8(fdp, 2, 20)) 

99 ) 

100 return x509.Name(attrs) 

101 

102 

103def _ski(public_key) -> x509.SubjectKeyIdentifier: 

104 return x509.SubjectKeyIdentifier.from_public_key(public_key) 

105 

106 

107def deterministic_serial(fdp: atheris.FuzzedDataProvider) -> int: 

108 """Deterministic, positive serial (≤159 bits, non-zero) from input.""" 

109 length = fdp.ConsumeIntInRange(1, 20) 

110 b = fdp.ConsumeBytes(length) 

111 if not b: 

112 b = b"\x01" 

113 val = int.from_bytes(b, "big") & ((1 << 159) - 1) 

114 return val or 1 

115 

116 

117def deterministic_validity( 

118 fdp: atheris.FuzzedDataProvider, 

119) -> tuple[dt.datetime, dt.datetime]: 

120 """Validity window derived solely from fuzz input (no wall clock).""" 

121 base = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) 

122 start_days = fdp.ConsumeIntInRange(0, 9000) 

123 not_before = base + dt.timedelta(days=start_days) 

124 lifetime_days = fdp.ConsumeIntInRange(30, 3650) 

125 not_after = not_before + dt.timedelta(days=lifetime_days) 

126 return not_before, not_after 

127 

128 

129def _pick_sig_hash(fdp: atheris.FuzzedDataProvider): 

130 return fdp.PickValueInList( 

131 [hashes.SHA256(), hashes.SHA384(), hashes.SHA512()] 

132 ) 

133 

134 

135def build_valid_chain( 

136 fdp: atheris.FuzzedDataProvider, 

137) -> tuple[x509.Certificate, object, list[x509.Certificate]]: 

138 """Build a valid chain: root -> 0..3 intermediates -> leaf (depth 1..5). 

139 

140 Returns (leaf_cert, leaf_key, issuers_chain) where issuers_chain is 

141 [nearest_intermediate, ..., root] and does NOT include the leaf. 

142 """ 

143 depth = fdp.ConsumeIntInRange(1, 5) 

144 not_before, not_after = deterministic_validity(fdp) 

145 

146 # Root CA 

147 root_key = gen_private_key(fdp) 

148 root_name = gen_name(fdp) 

149 root_builder = ( 

150 x509.CertificateBuilder() 

151 .subject_name(root_name) 

152 .issuer_name(root_name) 

153 .public_key(root_key.public_key()) 

154 .serial_number(deterministic_serial(fdp)) 

155 .not_valid_before(not_before) 

156 .not_valid_after(not_after) 

157 .add_extension( 

158 x509.BasicConstraints(ca=True, path_length=depth - 1), critical=True 

159 ) 

160 .add_extension( 

161 x509.KeyUsage( 

162 digital_signature=False, 

163 content_commitment=False, 

164 key_encipherment=False, 

165 data_encipherment=False, 

166 key_agreement=False, 

167 key_cert_sign=True, 

168 crl_sign=True, 

169 encipher_only=False, 

170 decipher_only=False, 

171 ), 

172 critical=True, 

173 ) 

174 .add_extension(_ski(root_key.public_key()), critical=False) 

175 ) 

176 root_cert = root_builder.sign( 

177 private_key=root_key, algorithm=_pick_sig_hash(fdp) 

178 ) 

179 

180 issuer_key = root_key 

181 issuer_cert = root_cert 

182 issuers: list[x509.Certificate] = [root_cert] 

183 

184 # Intermediates 

185 for i in range(depth - 1): 

186 key = gen_private_key(fdp) 

187 name = gen_name(fdp) 

188 inter_builder = ( 

189 x509.CertificateBuilder() 

190 .subject_name(name) 

191 .issuer_name(issuer_cert.subject) 

192 .public_key(key.public_key()) 

193 .serial_number(deterministic_serial(fdp)) 

194 .not_valid_before(not_before) 

195 .not_valid_after(not_after) 

196 .add_extension( 

197 x509.BasicConstraints(ca=True, path_length=(depth - 2 - i)), 

198 critical=True, 

199 ) 

200 .add_extension( 

201 x509.KeyUsage( 

202 digital_signature=False, 

203 content_commitment=False, 

204 key_encipherment=False, 

205 data_encipherment=False, 

206 key_agreement=False, 

207 key_cert_sign=True, 

208 crl_sign=True, 

209 encipher_only=False, 

210 decipher_only=False, 

211 ), 

212 critical=True, 

213 ) 

214 .add_extension(_ski(key.public_key()), critical=False) 

215 .add_extension( 

216 x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( 

217 x509.SubjectKeyIdentifier.from_public_key( 

218 issuer_key.public_key() 

219 ) 

220 ), 

221 critical=False, 

222 ) 

223 ) 

224 inter_cert = inter_builder.sign( 

225 private_key=issuer_key, algorithm=_pick_sig_hash(fdp) 

226 ) 

227 issuer_key = key 

228 issuer_cert = inter_cert 

229 issuers.insert(0, inter_cert) # nearest first 

230 

231 # Leaf (code signing) — ALWAYS EC to satisfy signer expectations 

232 leaf_key = gen_ec_key(fdp) 

233 leaf_name = gen_name(fdp) 

234 leaf_builder = ( 

235 x509.CertificateBuilder() 

236 .subject_name(leaf_name) 

237 .issuer_name(issuer_cert.subject) 

238 .public_key(leaf_key.public_key()) 

239 .serial_number(deterministic_serial(fdp)) 

240 .not_valid_before(not_before) 

241 .not_valid_after(not_after) 

242 .add_extension( 

243 x509.BasicConstraints(ca=False, path_length=None), critical=True 

244 ) 

245 .add_extension( 

246 x509.KeyUsage( 

247 digital_signature=True, 

248 content_commitment=True, 

249 key_encipherment=False, 

250 data_encipherment=False, 

251 key_agreement=False, 

252 key_cert_sign=False, 

253 crl_sign=False, 

254 encipher_only=False, 

255 decipher_only=False, 

256 ), 

257 critical=True, 

258 ) 

259 .add_extension( 

260 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CODE_SIGNING]), 

261 critical=False, 

262 ) 

263 .add_extension(_ski(leaf_key.public_key()), critical=False) 

264 .add_extension( 

265 x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( 

266 x509.SubjectKeyIdentifier.from_public_key( 

267 issuer_key.public_key() 

268 ) 

269 ), 

270 critical=False, 

271 ) 

272 ) 

273 leaf_cert = leaf_builder.sign( 

274 private_key=issuer_key, algorithm=_pick_sig_hash(fdp) 

275 ) 

276 

277 return leaf_cert, leaf_key, issuers 

278 

279 

280def to_pem_cert(cert: x509.Certificate) -> bytes: 

281 return cert.public_bytes(encoding=serialization.Encoding.PEM) 

282 

283 

284def key_to_pem(priv: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey) -> bytes: 

285 return priv.private_bytes( 

286 encoding=serialization.Encoding.PEM, 

287 format=serialization.PrivateFormat.PKCS8, 

288 encryption_algorithm=serialization.NoEncryption(), 

289 ) 

290 

291 

292def TestOneInput(data: bytes): 

293 fdp = atheris.FuzzedDataProvider(data) 

294 

295 # 1) Build certs & keys (catch x509 construction errors) 

296 try: 

297 leaf_cert, leaf_key, issuers = build_valid_chain(fdp) 

298 except ( 

299 ValueError, 

300 TypeError, 

301 x509.DuplicateExtension, 

302 x509.UnsupportedGeneralNameType, 

303 UnsupportedAlgorithm, 

304 ): 

305 return # skip this testcase; invalid X.509 

306 

307 # 2) Convert to PEM and write to disk (catch serialization errors) 

308 workdir = tempfile.mkdtemp(prefix="fuzz_cert_") 

309 try: 

310 leaf_key_path = Path(workdir) / "leaf-key.pem" 

311 leaf_cert_path = Path(workdir) / "leaf-cert.pem" 

312 chain_paths: list[Path] = [] 

313 

314 with open(leaf_key_path, "wb") as f: 

315 f.write(key_to_pem(leaf_key)) 

316 with open(leaf_cert_path, "wb") as f: 

317 f.write(to_pem_cert(leaf_cert)) 

318 

319 for idx, cert in enumerate(issuers): 

320 p = Path(workdir) / f"chain-{idx}.pem" 

321 with open(p, "wb") as f: 

322 f.write(to_pem_cert(cert)) 

323 chain_paths.append(p) 

324 

325 # Check chain length before signing and verifying 

326 if len(chain_paths) <= 1: 

327 shutil.rmtree(workdir, ignore_errors=True) 

328 return 

329 

330 except (ValueError, TypeError, UnsupportedAlgorithm): 

331 shutil.rmtree(workdir, ignore_errors=True) 

332 return 

333 

334 # 3) Create model files 

335 model_path_dir = tempfile.mkdtemp(prefix="fuzz_model_") 

336 model_path_p = Path(model_path_dir) 

337 created_files = create_fuzz_files(model_path_p, fdp) 

338 if created_files == 0: 

339 return 

340 

341 # Signature output path (we ignore this when signing and verifying) 

342 fname = f"signature-{_rand_utf8(fdp, 3, 12).replace('/', '_')}.sig" 

343 signature_path = model_path_p / fname 

344 

345 # Ignores 

346 ignore_git = fdp.ConsumeBool() 

347 extra_ignores: list[Path] = [] 

348 

349 # 4) Sign and 5) Verify 

350 try: 

351 signing.Config().use_certificate_signer( 

352 private_key=leaf_key_path, 

353 signing_certificate=leaf_cert_path, 

354 certificate_chain=chain_paths, 

355 ).set_hashing_config( 

356 hashing.Config().set_ignored_paths( 

357 paths=[*list(extra_ignores), signature_path], 

358 ignore_git_paths=ignore_git, 

359 ) 

360 ).sign(model_path_p, signature_path) 

361 

362 verifying.Config().use_certificate_verifier( 

363 certificate_chain=chain_paths, log_fingerprints=False 

364 ).set_hashing_config( 

365 hashing.Config().set_ignored_paths( 

366 paths=[*list(extra_ignores), signature_path], 

367 ignore_git_paths=ignore_git, 

368 ) 

369 ).verify(model_path_p, signature_path) 

370 

371 finally: 

372 # Always clean up temp dirs 

373 shutil.rmtree(model_path_dir, ignore_errors=True) 

374 shutil.rmtree(workdir, ignore_errors=True) 

375 

376 

377def main() -> None: 

378 atheris.instrument_all() 

379 atheris.Setup(sys.argv, TestOneInput) 

380 atheris.Fuzz() 

381 

382 

383if __name__ == "__main__": 

384 main()