Coverage for /pythoncovmergedfiles/medio/medio/src/model-transparency/tests/fuzzing/fuzz_with_cert_chain.py: 71%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

154 statements  

1###### Coverage stub 

2import atexit 

3import coverage 

4cov = coverage.coverage(data_file='.coverage', cover_pylib=True) 

5cov.start() 

6# Register an exist handler that will print coverage 

7def exit_handler(): 

8 cov.stop() 

9 cov.save() 

10atexit.register(exit_handler) 

11####### End of coverage stub 

12# Copyright 2025 The Sigstore Authors 

13# 

14# Licensed under the Apache License, Version 2.0 (the "License"); 

15# you may not use this file except in compliance with the License. 

16# You may obtain a copy of the License at 

17# 

18# http://www.apache.org/licenses/LICENSE-2.0 

19# 

20# Unless required by applicable law or agreed to in writing, software 

21# distributed under the License is distributed on an "AS IS" BASIS, 

22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

23# See the License for the specific language governing permissions and 

24# limitations under the License. 

25 

26import datetime as dt 

27from pathlib import Path 

28import shutil 

29import sys 

30import tempfile 

31 

32import atheris # type: ignore 

33from cryptography import x509 

34from cryptography.exceptions import UnsupportedAlgorithm 

35from cryptography.hazmat.primitives import hashes 

36from cryptography.hazmat.primitives import serialization 

37from cryptography.hazmat.primitives.asymmetric import ec 

38from cryptography.hazmat.primitives.asymmetric import rsa 

39from cryptography.x509.oid import ExtendedKeyUsageOID 

40from cryptography.x509.oid import NameOID 

41from utils import create_fuzz_files 

42 

43from model_signing import hashing 

44from model_signing import signing 

45from model_signing import verifying 

46 

47 

48def _rand_utf8( 

49 fdp: atheris.FuzzedDataProvider, min_len: int = 1, max_len: int = 32 

50) -> str: 

51 n = fdp.ConsumeIntInRange(min_len, max_len) 

52 data = fdp.ConsumeBytes(n) 

53 if not data: 

54 return "x" 

55 s = "".join(chr(32 + (c % 95)) for c in data).strip() 

56 return s or "x" 

57 

58 

59def gen_private_key(fdp: atheris.FuzzedDataProvider): 

60 """Generate RSA or EC private key using fuzz data (for CAs).""" 

61 if fdp.ConsumeBool(): 

62 curve = fdp.PickValueInList( 

63 [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()] 

64 ) 

65 return ec.generate_private_key(curve) 

66 key_size = fdp.PickValueInList([1024, 2048]) 

67 return rsa.generate_private_key(public_exponent=65537, key_size=key_size) 

68 

69 

70def gen_ec_key(fdp: atheris.FuzzedDataProvider): 

71 """Generate an EC private key (for the leaf).""" 

72 curve = fdp.PickValueInList( 

73 [ec.SECP256R1(), ec.SECP384R1(), ec.SECP521R1()] 

74 ) 

75 return ec.generate_private_key(curve) 

76 

77 

78def gen_name(fdp: atheris.FuzzedDataProvider) -> x509.Name: 

79 attrs = [x509.NameAttribute(NameOID.COMMON_NAME, _rand_utf8(fdp, 3, 40))] 

80 if fdp.ConsumeBool(): 

81 attrs.append( 

82 x509.NameAttribute( 

83 NameOID.ORGANIZATION_NAME, _rand_utf8(fdp, 2, 20) 

84 ) 

85 ) 

86 if fdp.ConsumeBool(): 

87 attrs.append( 

88 x509.NameAttribute( 

89 NameOID.ORGANIZATIONAL_UNIT_NAME, _rand_utf8(fdp, 2, 20) 

90 ) 

91 ) 

92 if fdp.ConsumeBool(): 

93 cb = fdp.ConsumeBytes(2) or b"US" 

94 country = "".join(chr(ord("A") + (b % 26)) for b in cb) 

95 attrs.append(x509.NameAttribute(NameOID.COUNTRY_NAME, country)) 

96 if fdp.ConsumeBool(): 

97 attrs.append( 

98 x509.NameAttribute(NameOID.LOCALITY_NAME, _rand_utf8(fdp, 2, 20)) 

99 ) 

100 return x509.Name(attrs) 

101 

102 

103def _ski(public_key) -> x509.SubjectKeyIdentifier: 

104 return x509.SubjectKeyIdentifier.from_public_key(public_key) 

105 

106 

107def deterministic_serial(fdp: atheris.FuzzedDataProvider) -> int: 

108 """Deterministic, positive serial (≤159 bits, non-zero) from input.""" 

109 length = fdp.ConsumeIntInRange(1, 20) 

110 b = fdp.ConsumeBytes(length) 

111 if not b: 

112 b = b"\x01" 

113 val = int.from_bytes(b, "big") & ((1 << 159) - 1) 

114 return val or 1 

115 

116 

117def deterministic_validity( 

118 fdp: atheris.FuzzedDataProvider, 

119) -> tuple[dt.datetime, dt.datetime]: 

120 """Validity window derived solely from fuzz input (no wall clock).""" 

121 base = dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc) 

122 start_days = fdp.ConsumeIntInRange(0, 9000) 

123 not_before = base + dt.timedelta(days=start_days) 

124 lifetime_days = fdp.ConsumeIntInRange(30, 3650) 

125 not_after = not_before + dt.timedelta(days=lifetime_days) 

126 return not_before, not_after 

127 

128 

129def _pick_sig_hash(fdp: atheris.FuzzedDataProvider): 

130 return fdp.PickValueInList( 

131 [hashes.SHA256(), hashes.SHA384(), hashes.SHA512()] 

132 ) 

133 

134 

135def build_valid_chain( 

136 fdp: atheris.FuzzedDataProvider, 

137) -> tuple[x509.Certificate, object, list[x509.Certificate]]: 

138 """Build a valid chain: root -> 0..3 intermediates -> leaf (depth 1..5). 

139 

140 Returns (leaf_cert, leaf_key, issuers_chain) where issuers_chain is 

141 [nearest_intermediate, ..., root] and does NOT include the leaf. 

142 """ 

143 depth = fdp.ConsumeIntInRange(1, 5) 

144 not_before, not_after = deterministic_validity(fdp) 

145 

146 # Root CA 

147 root_key = gen_private_key(fdp) 

148 root_name = gen_name(fdp) 

149 root_builder = ( 

150 x509.CertificateBuilder() 

151 .subject_name(root_name) 

152 .issuer_name(root_name) 

153 .public_key(root_key.public_key()) 

154 .serial_number(deterministic_serial(fdp)) 

155 .not_valid_before(not_before) 

156 .not_valid_after(not_after) 

157 .add_extension( 

158 x509.BasicConstraints(ca=True, path_length=depth - 1), critical=True 

159 ) 

160 .add_extension( 

161 x509.KeyUsage( 

162 digital_signature=False, 

163 content_commitment=False, 

164 key_encipherment=False, 

165 data_encipherment=False, 

166 key_agreement=False, 

167 key_cert_sign=True, 

168 crl_sign=True, 

169 encipher_only=False, 

170 decipher_only=False, 

171 ), 

172 critical=True, 

173 ) 

174 .add_extension(_ski(root_key.public_key()), critical=False) 

175 ) 

176 root_cert = root_builder.sign( 

177 private_key=root_key, algorithm=_pick_sig_hash(fdp) 

178 ) 

179 

180 issuer_key = root_key 

181 issuer_cert = root_cert 

182 issuers: list[x509.Certificate] = [root_cert] 

183 

184 # Intermediates 

185 for i in range(depth - 1): 

186 key = gen_private_key(fdp) 

187 name = gen_name(fdp) 

188 inter_builder = ( 

189 x509.CertificateBuilder() 

190 .subject_name(name) 

191 .issuer_name(issuer_cert.subject) 

192 .public_key(key.public_key()) 

193 .serial_number(deterministic_serial(fdp)) 

194 .not_valid_before(not_before) 

195 .not_valid_after(not_after) 

196 .add_extension( 

197 x509.BasicConstraints(ca=True, path_length=(depth - 2 - i)), 

198 critical=True, 

199 ) 

200 .add_extension( 

201 x509.KeyUsage( 

202 digital_signature=False, 

203 content_commitment=False, 

204 key_encipherment=False, 

205 data_encipherment=False, 

206 key_agreement=False, 

207 key_cert_sign=True, 

208 crl_sign=True, 

209 encipher_only=False, 

210 decipher_only=False, 

211 ), 

212 critical=True, 

213 ) 

214 .add_extension(_ski(key.public_key()), critical=False) 

215 .add_extension( 

216 x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( 

217 x509.SubjectKeyIdentifier.from_public_key( 

218 issuer_key.public_key() 

219 ) 

220 ), 

221 critical=False, 

222 ) 

223 ) 

224 inter_cert = inter_builder.sign( 

225 private_key=issuer_key, algorithm=_pick_sig_hash(fdp) 

226 ) 

227 issuer_key = key 

228 issuer_cert = inter_cert 

229 issuers.insert(0, inter_cert) # nearest first 

230 

231 # Leaf (code signing) — ALWAYS EC to satisfy signer expectations 

232 leaf_key = gen_ec_key(fdp) 

233 leaf_name = gen_name(fdp) 

234 leaf_builder = ( 

235 x509.CertificateBuilder() 

236 .subject_name(leaf_name) 

237 .issuer_name(issuer_cert.subject) 

238 .public_key(leaf_key.public_key()) 

239 .serial_number(deterministic_serial(fdp)) 

240 .not_valid_before(not_before) 

241 .not_valid_after(not_after) 

242 .add_extension( 

243 x509.BasicConstraints(ca=False, path_length=None), critical=True 

244 ) 

245 .add_extension( 

246 x509.KeyUsage( 

247 digital_signature=True, 

248 content_commitment=True, 

249 key_encipherment=False, 

250 data_encipherment=False, 

251 key_agreement=False, 

252 key_cert_sign=False, 

253 crl_sign=False, 

254 encipher_only=False, 

255 decipher_only=False, 

256 ), 

257 critical=True, 

258 ) 

259 .add_extension( 

260 x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CODE_SIGNING]), 

261 critical=False, 

262 ) 

263 .add_extension(_ski(leaf_key.public_key()), critical=False) 

264 .add_extension( 

265 x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier( 

266 x509.SubjectKeyIdentifier.from_public_key( 

267 issuer_key.public_key() 

268 ) 

269 ), 

270 critical=False, 

271 ) 

272 ) 

273 leaf_cert = leaf_builder.sign( 

274 private_key=issuer_key, algorithm=_pick_sig_hash(fdp) 

275 ) 

276 

277 return leaf_cert, leaf_key, issuers 

278 

279 

280def to_pem_cert(cert: x509.Certificate) -> bytes: 

281 return cert.public_bytes(encoding=serialization.Encoding.PEM) 

282 

283 

284def key_to_pem(priv: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey) -> bytes: 

285 return priv.private_bytes( 

286 encoding=serialization.Encoding.PEM, 

287 format=serialization.PrivateFormat.PKCS8, 

288 encryption_algorithm=serialization.NoEncryption(), 

289 ) 

290 

291 

292def _build_hashing_config_from_fdp( 

293 fdp: atheris.FuzzedDataProvider, 

294 extra_ignores: list[Path], 

295 signature_path: Path, 

296) -> hashing.Config: 

297 alg = ["sha256", "blake2", "blake3"][fdp.ConsumeIntInRange(0, 2)] 

298 hcfg = hashing.Config().set_ignored_paths( 

299 paths=[*list(extra_ignores), signature_path], 

300 ignore_git_paths=fdp.ConsumeBool(), 

301 ) 

302 if fdp.ConsumeBool(): 

303 hcfg.use_file_serialization(hashing_algorithm=alg) 

304 else: 

305 hcfg.use_shard_serialization(hashing_algorithm=alg) 

306 return hcfg 

307 

308 

309def TestOneInput(data: bytes): 

310 fdp = atheris.FuzzedDataProvider(data) 

311 

312 # 1) Build certs & keys (catch x509 construction errors) 

313 try: 

314 leaf_cert, leaf_key, issuers = build_valid_chain(fdp) 

315 except ( 

316 ValueError, 

317 TypeError, 

318 x509.DuplicateExtension, 

319 x509.UnsupportedGeneralNameType, 

320 UnsupportedAlgorithm, 

321 ): 

322 return # skip this testcase; invalid X.509 

323 

324 # 2) Convert to PEM and write to disk (catch serialization errors) 

325 workdir = tempfile.mkdtemp(prefix="fuzz_cert_") 

326 try: 

327 leaf_key_path = Path(workdir) / "leaf-key.pem" 

328 leaf_cert_path = Path(workdir) / "leaf-cert.pem" 

329 chain_paths: list[Path] = [] 

330 

331 with open(leaf_key_path, "wb") as f: 

332 f.write(key_to_pem(leaf_key)) 

333 with open(leaf_cert_path, "wb") as f: 

334 f.write(to_pem_cert(leaf_cert)) 

335 

336 for idx, cert in enumerate(issuers): 

337 p = Path(workdir) / f"chain-{idx}.pem" 

338 with open(p, "wb") as f: 

339 f.write(to_pem_cert(cert)) 

340 chain_paths.append(p) 

341 

342 # Check chain length before signing and verifying 

343 if len(chain_paths) <= 1: 

344 shutil.rmtree(workdir, ignore_errors=True) 

345 return 

346 

347 except (ValueError, TypeError, UnsupportedAlgorithm): 

348 shutil.rmtree(workdir, ignore_errors=True) 

349 return 

350 

351 # 3) Create model files 

352 model_path_dir = tempfile.mkdtemp(prefix="fuzz_model_") 

353 model_path_p = Path(model_path_dir) 

354 created_files = create_fuzz_files(model_path_p, fdp) 

355 if created_files == 0: 

356 return 

357 

358 # Signature output path (we ignore this when signing and verifying) 

359 fname = f"signature-{_rand_utf8(fdp, 3, 12).replace('/', '_')}.sig" 

360 signature_path = model_path_p / fname 

361 

362 # Ignores (collected for hashing config) 

363 extra_ignores: list[Path] = [] 

364 

365 # Build hashing config (serialization + algorithm + ignores) 

366 hcfg = _build_hashing_config_from_fdp(fdp, extra_ignores, signature_path) 

367 

368 # 4) Sign and 5) Verify 

369 try: 

370 signing.Config().use_certificate_signer( 

371 private_key=leaf_key_path, 

372 signing_certificate=leaf_cert_path, 

373 certificate_chain=chain_paths, 

374 ).set_hashing_config(hcfg).sign(model_path_p, signature_path) 

375 

376 verifying.Config().use_certificate_verifier( 

377 certificate_chain=chain_paths, log_fingerprints=False 

378 ).set_hashing_config(hcfg).verify(model_path_p, signature_path) 

379 

380 finally: 

381 # Always clean up temp dirs 

382 shutil.rmtree(model_path_dir, ignore_errors=True) 

383 shutil.rmtree(workdir, ignore_errors=True) 

384 

385 

386def main() -> None: 

387 atheris.instrument_all() 

388 atheris.Setup(sys.argv, TestOneInput) 

389 atheris.Fuzz() 

390 

391 

392if __name__ == "__main__": 

393 main()