Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/tls/cert.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

924 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net> 

5# <arno@natisbad.org> 

6# 2015, 2016, 2017 Maxence Tury <maxence.tury@ssi.gouv.fr> 

7# 2022-2025 Gabriel Potter 

8 

9r""" 

10High-level methods for PKI objects (X.509 certificates, CRLs, CSR, Keys, CMS). 

11Supported keys include RSA, ECDSA and EdDSA. 

12 

13The classes below are wrappers for the ASN.1 objects defined in x509.py. 

14 

15Example 1: Certificate & Private key 

16____________________________________ 

17 

18For instance, here is what you could do in order to modify the subject public 

19key info of a 'cert' and then resign it with whatever 'key':: 

20 

21 >>> from scapy.layers.tls.cert import * 

22 >>> cert = Cert("cert.der") 

23 >>> k = PrivKeyRSA() # generate a private key 

24 >>> cert.setSubjectPublicKeyFromPrivateKey(k) 

25 >>> cert.resignWith(k) 

26 >>> cert.export("newcert.pem") 

27 >>> k.export("mykey.pem") 

28 

29One could also edit arguments like the serial number, as such:: 

30 

31 >>> from scapy.layers.tls.cert import * 

32 >>> c = Cert("mycert.pem") 

33 >>> c.tbsCertificate.serialNumber = 0x4B1D 

34 >>> k = PrivKey("mykey.pem") # import an existing private key 

35 >>> c.resignWith(k) 

36 >>> c.export("newcert.pem") 

37 

38To export the public key of a private key:: 

39 

40 >>> k = PrivKey("mykey.pem") 

41 >>> k.pubkey.export("mypubkey.pem") 

42 

43Example 2: CertList and CertTree 

44________________________________ 

45 

46Load a .pem file that contains multiple certificates:: 

47 

48 >>> l = CertList("ca_chain.pem") 

49 >>> l.show() 

50 0000 [X.509 Cert Subject:/C=FR/OU=Scapy Test PKI/CN=Scapy Test CA...] 

51 0001 [X.509 Cert Subject:/C=FR/OU=Scapy Test PKI/CN=Scapy Test Client...] 

52 

53Use 'CertTree' to organize the certificates in a tree:: 

54 

55 >>> tree = CertTree("ca_chain.pem") # or tree = CertTree(l) 

56 >>> tree.show() 

57 /C=Ulaanbaatar/OU=Scapy Test PKI/CN=Scapy Test CA [Self Signed] 

58 /C=FR/OU=Scapy Test PKI/CN=Scapy Test Client [Not Self Signed] 

59 

60Example 3: Certificate Signing Request (CSR) 

61____________________________________________ 

62 

63Scapy's :py:class:`~scapy.layers.tls.cert.CSR` class supports both PKCS#10 and CMC 

64formats. 

65 

66Load and display a CSR:: 

67 

68 >>> csr = CSR("cert.req") 

69 >>> csr 

70 [CSR Format: CMC, Subject:/O=TestOrg/CN=TestCN, Verified: True] 

71 >>> csr.certReq.show() 

72 ###[ PKCS10_CertificationRequest ]### 

73 \certificationRequestInfo\ 

74 |###[ PKCS10_CertificationRequestInfo ]### 

75 | version = 0x0 <ASN1_INTEGER[0] 

76 | \subject \ 

77 | |###[ X509_RDN ]### 

78 | | \rdn \ 

79 | | |###[ X509_AttributeTypeAndValue ]### 

80 | | | type = <ASN1_OID['organizationName']> 

81 | | | value = <ASN1_UTF8_STRING[b'TestOrg']> 

82 [...] 

83 

84Get its public key and verify its signature:: 

85 

86 >>> csr.pubkey 

87 <scapy.layers.tls.cert.PubKeyRSA at 0x7f3481149310> 

88 >>> csr.verifySelf() 

89 True 

90 

91No need for obnoxious openssl tweaking anymore. :) 

92""" 

93 

94import base64 

95import enum 

96import os 

97import time 

98import warnings 

99 

100from scapy.config import conf, crypto_validator 

101from scapy.compat import Self 

102from scapy.error import warning 

103from scapy.utils import binrepr 

104from scapy.asn1.asn1 import ( 

105 ASN1_BIT_STRING, 

106 ASN1_NULL, 

107 ASN1_OID, 

108 ASN1_STRING, 

109) 

110from scapy.asn1.mib import hash_by_oid 

111from scapy.packet import Packet 

112from scapy.layers.x509 import ( 

113 CMS_CertificateChoices, 

114 CMS_ContentInfo, 

115 CMS_EncapsulatedContentInfo, 

116 CMS_IssuerAndSerialNumber, 

117 CMS_RevocationInfoChoice, 

118 CMS_SignedAttrsForSignature, 

119 CMS_SignedData, 

120 CMS_SignerInfo, 

121 CMS_SubjectKeyIdentifier, 

122 ECDSAPrivateKey_OpenSSL, 

123 ECDSAPrivateKey, 

124 ECDSAPublicKey, 

125 EdDSAPrivateKey, 

126 EdDSAPublicKey, 

127 PKCS10_CertificationRequest, 

128 RSAPrivateKey_OpenSSL, 

129 RSAPrivateKey, 

130 RSAPublicKey, 

131 X509_AlgorithmIdentifier, 

132 X509_Attribute, 

133 X509_AttributeValue, 

134 X509_Cert, 

135 X509_CRL, 

136 X509_SubjectPublicKeyInfo, 

137) 

138from scapy.layers.tls.crypto.hash import _get_hash 

139from scapy.layers.tls.crypto.pkcs1 import ( 

140 _DecryptAndSignRSA, 

141 _EncryptAndVerifyRSA, 

142 pkcs_os2ip, 

143) 

144from scapy.compat import bytes_encode 

145 

146# Typing imports 

147from typing import ( 

148 List, 

149 Optional, 

150 Union, 

151) 

152 

153if conf.crypto_valid: 

154 from cryptography.exceptions import InvalidSignature 

155 from cryptography.hazmat.backends import default_backend 

156 from cryptography.hazmat.primitives import hashes 

157 from cryptography.hazmat.primitives import serialization 

158 from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519, x448 

159 

160 # cryptography raised the minimum RSA key length to 1024 in 43.0+ 

161 # https://github.com/pyca/cryptography/pull/10278 

162 # but we need still 512 for EXPORT40 ciphers (yes EXPORT is terrible) 

163 # https://datatracker.ietf.org/doc/html/rfc2246#autoid-66 

164 # The following detects the change and hacks around it using the backend 

165 

166 try: 

167 rsa.generate_private_key(public_exponent=65537, key_size=512) 

168 _RSA_512_SUPPORTED = True 

169 except ValueError: 

170 # cryptography > 43.0 

171 _RSA_512_SUPPORTED = False 

172 from cryptography.hazmat.primitives.asymmetric.rsa import rust_openssl 

173 

174 

175# Maximum allowed size in bytes for a certificate file, to avoid 

176# loading huge file when importing a cert 

177_MAX_KEY_SIZE = 50 * 1024 

178_MAX_CERT_SIZE = 50 * 1024 

179_MAX_CRL_SIZE = 10 * 1024 * 1024 # some are that big 

180_MAX_CSR_SIZE = 50 * 1024 

181 

182 

183##################################################################### 

184# Some helpers 

185##################################################################### 

186 

187 

188@conf.commands.register 

189def der2pem(der_string, obj="UNKNOWN"): 

190 """Convert DER octet string to PEM format (with optional header)""" 

191 # Encode a byte string in PEM format. Header advertises <obj> type. 

192 pem_string = "-----BEGIN %s-----\n" % obj 

193 base64_string = base64.b64encode(der_string).decode() 

194 chunks = [base64_string[i : i + 64] for i in range(0, len(base64_string), 64)] 

195 pem_string += "\n".join(chunks) 

196 pem_string += "\n-----END %s-----\n" % obj 

197 return pem_string 

198 

199 

200@conf.commands.register 

201def pem2der(pem_string): 

202 """Convert PEM string to DER format""" 

203 # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'. 

204 pem_string = pem_string.replace(b"\r", b"") 

205 first_idx = pem_string.find(b"-----\n") + 6 

206 if pem_string.find(b"-----BEGIN", first_idx) != -1: 

207 raise Exception("pem2der() expects only one PEM-encoded object") 

208 last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----")) 

209 base64_string = pem_string[first_idx:last_idx] 

210 base64_string.replace(b"\n", b"") 

211 der_string = base64.b64decode(base64_string) 

212 return der_string 

213 

214 

215def split_pem(s): 

216 """ 

217 Split PEM objects. Useful to process concatenated certificates. 

218 """ 

219 pem_strings = [] 

220 while s != b"": 

221 start_idx = s.find(b"-----BEGIN") 

222 if start_idx == -1: 

223 break 

224 end_idx = s.find(b"-----END") 

225 if end_idx == -1: 

226 raise Exception("Invalid PEM object (missing END tag)") 

227 end_idx = s.find(b"\n", end_idx) + 1 

228 if end_idx == 0: 

229 # There is no final \n 

230 end_idx = len(s) 

231 pem_strings.append(s[start_idx:end_idx]) 

232 s = s[end_idx:] 

233 return pem_strings 

234 

235 

236class _PKIObj(object): 

237 def __init__(self, frmt, der): 

238 self.frmt = frmt 

239 self._der = der 

240 

241 

242class _PKIObjMaker(type): 

243 def __call__(cls, obj_path, obj_max_size, pem_marker=None): 

244 # This enables transparent DER and PEM-encoded data imports. 

245 # Note that when importing a PEM file with multiple objects (like ECDSA 

246 # private keys output by openssl), it will concatenate every object in 

247 # order to create a 'der' attribute. When converting a 'multi' DER file 

248 # into a PEM file, though, the PEM attribute will not be valid, 

249 # because we do not try to identify the class of each object. 

250 error_msg = "Unable to import data" 

251 

252 if obj_path is None: 

253 raise Exception(error_msg) 

254 obj_path = bytes_encode(obj_path) 

255 

256 if (b"\x00" not in obj_path) and os.path.isfile(obj_path): 

257 _size = os.path.getsize(obj_path) 

258 if _size > obj_max_size: 

259 raise Exception(error_msg) 

260 try: 

261 with open(obj_path, "rb") as f: 

262 _raw = f.read() 

263 except Exception: 

264 raise Exception(error_msg) 

265 else: 

266 _raw = obj_path 

267 

268 try: 

269 if b"-----BEGIN" in _raw: 

270 frmt = "PEM" 

271 pem = _raw 

272 der_list = split_pem(pem) 

273 der = b"".join(map(pem2der, der_list)) 

274 else: 

275 frmt = "DER" 

276 der = _raw 

277 except Exception: 

278 raise Exception(error_msg) 

279 

280 p = _PKIObj(frmt, der) 

281 return p 

282 

283 

284##################################################################### 

285# PKI objects wrappers 

286##################################################################### 

287 

288############### 

289# Public Keys # 

290############### 

291 

292 

293class _PubKeyFactory(_PKIObjMaker): 

294 """ 

295 Metaclass for PubKey creation. 

296 It casts the appropriate class on the fly, then fills in 

297 the appropriate attributes with import_from_asn1pkt() submethod. 

298 """ 

299 

300 def __call__(cls, key_path=None, cryptography_obj=None): 

301 # This allows to import cryptography objects directly 

302 if cryptography_obj is not None: 

303 obj = type.__call__(cls) 

304 obj.__class__ = cls 

305 obj.frmt = "original" 

306 obj.marker = "PUBLIC KEY" 

307 obj.pubkey = cryptography_obj 

308 return obj 

309 

310 if key_path is None: 

311 obj = type.__call__(cls) 

312 if cls is PubKey: 

313 cls = PubKeyRSA 

314 obj.__class__ = cls 

315 obj.frmt = "original" 

316 obj.fill_and_store() 

317 return obj 

318 

319 # This deals with the rare RSA 'kx export' call. 

320 if isinstance(key_path, tuple): 

321 obj = type.__call__(cls) 

322 obj.__class__ = PubKeyRSA 

323 obj.frmt = "tuple" 

324 obj.import_from_tuple(key_path) 

325 return obj 

326 

327 # Now for the usual calls, key_path may be the path to either: 

328 # _an X509_SubjectPublicKeyInfo, as processed by openssl; 

329 # _an RSAPublicKey; 

330 # _an ECDSAPublicKey; 

331 # _an EdDSAPublicKey. 

332 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 

333 try: 

334 spki = X509_SubjectPublicKeyInfo(obj._der) 

335 pubkey = spki.subjectPublicKey 

336 if isinstance(pubkey, RSAPublicKey): 

337 obj.__class__ = PubKeyRSA 

338 obj.import_from_asn1pkt(pubkey) 

339 elif isinstance(pubkey, ECDSAPublicKey): 

340 obj.__class__ = PubKeyECDSA 

341 obj.import_from_der(obj._der) 

342 elif isinstance(pubkey, EdDSAPublicKey): 

343 obj.__class__ = PubKeyEdDSA 

344 obj.import_from_der(obj._der) 

345 else: 

346 raise 

347 obj.marker = "PUBLIC KEY" 

348 except Exception: 

349 try: 

350 pubkey = RSAPublicKey(obj._der) 

351 obj.__class__ = PubKeyRSA 

352 obj.import_from_asn1pkt(pubkey) 

353 obj.marker = "RSA PUBLIC KEY" 

354 except Exception: 

355 # We cannot import an ECDSA public key without curve knowledge 

356 if conf.debug_dissector: 

357 raise 

358 raise Exception("Unable to import public key") 

359 return obj 

360 

361 

362class PubKey(metaclass=_PubKeyFactory): 

363 """ 

364 Parent class for PubKeyRSA, PubKeyECDSA and PubKeyEdDSA. 

365 Provides common verifyCert() and export() methods. 

366 """ 

367 

368 def verifyCert(self, cert): 

369 """Verifies either a Cert or an X509_Cert.""" 

370 h = _get_cert_sig_hashname(cert) 

371 tbsCert = cert.tbsCertificate 

372 sigVal = bytes(cert.signatureValue) 

373 return self.verify(bytes(tbsCert), sigVal, h=h, t="pkcs") 

374 

375 def verifyCsr(self, csr): 

376 """Verifies a CSR.""" 

377 h = _get_csr_sig_hashname(csr) 

378 certReqInfo = csr.certReq.certificationRequestInfo 

379 sigVal = bytes(csr.certReq.signature) 

380 return self.verify(bytes(certReqInfo), sigVal, h=h, t="pkcs") 

381 

382 @property 

383 def pem(self): 

384 return der2pem(self.der, self.marker) 

385 

386 @property 

387 def der(self): 

388 return self.pubkey.public_bytes( 

389 encoding=serialization.Encoding.DER, 

390 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

391 ) 

392 

393 def public_numbers(self, *args, **kwargs): 

394 return self.pubkey.public_numbers(*args, **kwargs) 

395 

396 @property 

397 def key_size(self): 

398 return self.pubkey.key_size 

399 

400 def export(self, filename, fmt=None): 

401 """ 

402 Export public key in 'fmt' format (DER or PEM) to file 'filename' 

403 """ 

404 if fmt is None: 

405 if filename.endswith(".pem"): 

406 fmt = "PEM" 

407 else: 

408 fmt = "DER" 

409 with open(filename, "wb") as f: 

410 if fmt == "DER": 

411 return f.write(self.der) 

412 elif fmt == "PEM": 

413 return f.write(self.pem.encode()) 

414 

415 @crypto_validator 

416 def verify(self, msg, sig, h="sha256", **kwargs): 

417 """ 

418 Verify signed data. 

419 """ 

420 raise NotImplementedError 

421 

422 

423class PubKeyRSA(PubKey, _EncryptAndVerifyRSA): 

424 """ 

425 Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py 

426 Use the 'key' attribute to access original object. 

427 """ 

428 

429 @crypto_validator 

430 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None): 

431 pubExp = pubExp or 65537 

432 if not modulus: 

433 real_modulusLen = modulusLen or 2048 

434 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED: 

435 # cryptography > 43.0 compatibility 

436 private_key = rust_openssl.rsa.generate_private_key( 

437 public_exponent=pubExp, 

438 key_size=real_modulusLen, 

439 ) 

440 else: 

441 private_key = rsa.generate_private_key( 

442 public_exponent=pubExp, 

443 key_size=real_modulusLen, 

444 backend=default_backend(), 

445 ) 

446 self.pubkey = private_key.public_key() 

447 else: 

448 real_modulusLen = len(binrepr(modulus)) 

449 if modulusLen and real_modulusLen != modulusLen: 

450 warning("modulus and modulusLen do not match!") 

451 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 

452 self.pubkey = pubNum.public_key(default_backend()) 

453 

454 self.marker = "PUBLIC KEY" 

455 

456 # Lines below are only useful for the legacy part of pkcs1.py 

457 pubNum = self.pubkey.public_numbers() 

458 self._modulusLen = real_modulusLen 

459 self._modulus = pubNum.n 

460 self._pubExp = pubNum.e 

461 

462 @crypto_validator 

463 def import_from_tuple(self, tup): 

464 # this is rarely used 

465 e, m, mLen = tup 

466 if isinstance(m, bytes): 

467 m = pkcs_os2ip(m) 

468 if isinstance(e, bytes): 

469 e = pkcs_os2ip(e) 

470 self.fill_and_store(modulus=m, pubExp=e) 

471 

472 def import_from_asn1pkt(self, pubkey): 

473 modulus = pubkey.modulus.val 

474 pubExp = pubkey.publicExponent.val 

475 self.fill_and_store(modulus=modulus, pubExp=pubExp) 

476 

477 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 

478 # no ECDSA encryption support, hence no ECDSA specific keywords here 

479 return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L) 

480 

481 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

482 return _EncryptAndVerifyRSA.verify(self, msg, sig, t=t, h=h, mgf=mgf, L=L) 

483 

484 

485class PubKeyECDSA(PubKey): 

486 """ 

487 Wrapper for ECDSA keys based on the cryptography library. 

488 Use the 'key' attribute to access original object. 

489 """ 

490 

491 @crypto_validator 

492 def fill_and_store(self, curve=None): 

493 curve = curve or ec.SECP256R1 

494 private_key = ec.generate_private_key(curve(), default_backend()) 

495 self.pubkey = private_key.public_key() 

496 

497 @crypto_validator 

498 def import_from_der(self, pubkey): 

499 # No lib support for explicit curves nor compressed points. 

500 self.pubkey = serialization.load_der_public_key( 

501 pubkey, 

502 backend=default_backend(), 

503 ) 

504 

505 def encrypt(self, msg, h="sha256", **kwargs): 

506 raise Exception("No ECDSA encryption support") 

507 

508 @crypto_validator 

509 def verify(self, msg, sig, h="sha256", **kwargs): 

510 # 'sig' should be a DER-encoded signature, as per RFC 3279 

511 try: 

512 self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h))) 

513 return True 

514 except InvalidSignature: 

515 return False 

516 

517 

518class PubKeyEdDSA(PubKey): 

519 """ 

520 Wrapper for EdDSA keys based on the cryptography library. 

521 Use the 'key' attribute to access original object. 

522 """ 

523 

524 @crypto_validator 

525 def fill_and_store(self, curve=None): 

526 curve = curve or x25519.X25519PrivateKey 

527 private_key = curve.generate() 

528 self.pubkey = private_key.public_key() 

529 

530 @crypto_validator 

531 def import_from_der(self, pubkey): 

532 self.pubkey = serialization.load_der_public_key( 

533 pubkey, 

534 backend=default_backend(), 

535 ) 

536 

537 def encrypt(self, msg, **kwargs): 

538 raise Exception("No EdDSA encryption support") 

539 

540 @crypto_validator 

541 def verify(self, msg, sig, **kwargs): 

542 # 'sig' should be a DER-encoded signature, as per RFC 3279 

543 try: 

544 self.pubkey.verify(sig, msg) 

545 return True 

546 except InvalidSignature: 

547 return False 

548 

549 

550################ 

551# Private Keys # 

552################ 

553 

554 

555class _PrivKeyFactory(_PKIObjMaker): 

556 """ 

557 Metaclass for PrivKey creation. 

558 It casts the appropriate class on the fly, then fills in 

559 the appropriate attributes with import_from_asn1pkt() submethod. 

560 """ 

561 

562 def __call__(cls, key_path=None, cryptography_obj=None): 

563 """ 

564 key_path may be the path to either: 

565 _an RSAPrivateKey_OpenSSL (as generated by openssl); 

566 _an ECDSAPrivateKey_OpenSSL (as generated by openssl); 

567 _an RSAPrivateKey; 

568 _an ECDSAPrivateKey. 

569 """ 

570 # This allows to import cryptography objects directly 

571 if cryptography_obj is not None: 

572 # We (stupidly) need to go through the whole import process because RSA 

573 # does more than just importing the cryptography objects... 

574 obj = _PKIObj( 

575 "DER", 

576 cryptography_obj.private_bytes( 

577 encoding=serialization.Encoding.DER, 

578 format=serialization.PrivateFormat.PKCS8, 

579 encryption_algorithm=serialization.NoEncryption(), 

580 ), 

581 ) 

582 elif key_path is None: 

583 obj = type.__call__(cls) 

584 if cls is PrivKey: 

585 cls = PrivKeyECDSA 

586 obj.__class__ = cls 

587 obj.frmt = "original" 

588 obj.fill_and_store() 

589 return obj 

590 else: 

591 # Load from file 

592 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 

593 

594 try: 

595 privkey = RSAPrivateKey_OpenSSL(obj._der) 

596 privkey = privkey.privateKey 

597 obj.__class__ = PrivKeyRSA 

598 obj.marker = "PRIVATE KEY" 

599 except Exception: 

600 try: 

601 privkey = ECDSAPrivateKey_OpenSSL(obj._der) 

602 privkey = privkey.privateKey 

603 obj.__class__ = PrivKeyECDSA 

604 obj.marker = "EC PRIVATE KEY" 

605 except Exception: 

606 try: 

607 privkey = RSAPrivateKey(obj._der) 

608 obj.__class__ = PrivKeyRSA 

609 obj.marker = "RSA PRIVATE KEY" 

610 except Exception: 

611 try: 

612 privkey = ECDSAPrivateKey(obj._der) 

613 obj.__class__ = PrivKeyECDSA 

614 obj.marker = "EC PRIVATE KEY" 

615 except Exception: 

616 try: 

617 privkey = EdDSAPrivateKey(obj._der) 

618 obj.__class__ = PrivKeyEdDSA 

619 obj.marker = "PRIVATE KEY" 

620 except Exception: 

621 raise Exception("Unable to import private key") 

622 try: 

623 obj.import_from_asn1pkt(privkey) 

624 except ImportError: 

625 pass 

626 return obj 

627 

628 

629class _Raw_ASN1_BIT_STRING(ASN1_BIT_STRING): 

630 """A ASN1_BIT_STRING that ignores BER encoding""" 

631 

632 def __bytes__(self): 

633 return self.val_readable 

634 

635 __str__ = __bytes__ 

636 

637 

638class PrivKey(metaclass=_PrivKeyFactory): 

639 """ 

640 Parent class for PrivKeyRSA, PrivKeyECDSA and PrivKeyEdDSA. 

641 Provides common signTBSCert(), resignCert(), verifyCert() 

642 and export() methods. 

643 """ 

644 

645 def signTBSCert(self, tbsCert, h="sha256"): 

646 """ 

647 Note that this will always copy the signature field from the 

648 tbsCertificate into the signatureAlgorithm field of the result, 

649 regardless of the coherence between its contents (which might 

650 indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2). 

651 

652 There is a small inheritance trick for the computation of sigVal 

653 below: in order to use a sign() method which would apply 

654 to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the 

655 subclasses accept any argument, be it from the RSA or ECDSA world, 

656 and then they keep the ones they're interested in. 

657 Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign(). 

658 """ 

659 sigAlg = tbsCert.signature 

660 h = h or hash_by_oid[sigAlg.algorithm.val] 

661 sigVal = self.sign(bytes(tbsCert), h=h, t="pkcs") 

662 c = X509_Cert() 

663 c.tbsCertificate = tbsCert 

664 c.signatureAlgorithm = sigAlg 

665 c.signatureValue = _Raw_ASN1_BIT_STRING(sigVal, readable=True) 

666 return c 

667 

668 def resignCert(self, cert): 

669 """Rewrite the signature of either a Cert or an X509_Cert.""" 

670 return self.signTBSCert(cert.tbsCertificate, h=None) 

671 

672 def verifyCert(self, cert): 

673 """Verifies either a Cert or an X509_Cert.""" 

674 return self.pubkey.verifyCert(cert) 

675 

676 def verifyCsr(self, cert): 

677 """Verifies either a CSR.""" 

678 return self.pubkey.verifyCsr(cert) 

679 

680 @property 

681 def pem(self): 

682 return der2pem(self.der, self.marker) 

683 

684 @property 

685 def der(self): 

686 return self.key.private_bytes( 

687 encoding=serialization.Encoding.DER, 

688 format=serialization.PrivateFormat.PKCS8, 

689 encryption_algorithm=serialization.NoEncryption(), 

690 ) 

691 

692 def export(self, filename, fmt=None): 

693 """ 

694 Export private key in 'fmt' format (DER or PEM) to file 'filename' 

695 """ 

696 if fmt is None: 

697 if filename.endswith(".pem"): 

698 fmt = "PEM" 

699 else: 

700 fmt = "DER" 

701 with open(filename, "wb") as f: 

702 if fmt == "DER": 

703 return f.write(self.der) 

704 elif fmt == "PEM": 

705 return f.write(self.pem.encode()) 

706 

707 @crypto_validator 

708 def sign(self, data, h="sha256", **kwargs): 

709 """ 

710 Sign data. 

711 """ 

712 raise NotImplementedError 

713 

714 @crypto_validator 

715 def verify(self, msg, sig, h="sha256", **kwargs): 

716 """ 

717 Verify signed data. 

718 """ 

719 raise NotImplementedError 

720 

721 

722class PrivKeyRSA(PrivKey, _DecryptAndSignRSA): 

723 """ 

724 Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py 

725 Use the 'key' attribute to access original object. 

726 """ 

727 

728 @crypto_validator 

729 def fill_and_store( 

730 self, 

731 modulus=None, 

732 modulusLen=None, 

733 pubExp=None, 

734 prime1=None, 

735 prime2=None, 

736 coefficient=None, 

737 exponent1=None, 

738 exponent2=None, 

739 privExp=None, 

740 ): 

741 pubExp = pubExp or 65537 

742 if None in [ 

743 modulus, 

744 prime1, 

745 prime2, 

746 coefficient, 

747 privExp, 

748 exponent1, 

749 exponent2, 

750 ]: 

751 # note that the library requires every parameter 

752 # in order to call RSAPrivateNumbers(...) 

753 # if one of these is missing, we generate a whole new key 

754 real_modulusLen = modulusLen or 2048 

755 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED: 

756 # cryptography > 43.0 compatibility 

757 self.key = rust_openssl.rsa.generate_private_key( 

758 public_exponent=pubExp, 

759 key_size=real_modulusLen, 

760 ) 

761 else: 

762 self.key = rsa.generate_private_key( 

763 public_exponent=pubExp, 

764 key_size=real_modulusLen, 

765 backend=default_backend(), 

766 ) 

767 pubkey = self.key.public_key() 

768 else: 

769 real_modulusLen = len(binrepr(modulus)) 

770 if modulusLen and real_modulusLen != modulusLen: 

771 warning("modulus and modulusLen do not match!") 

772 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 

773 privNum = rsa.RSAPrivateNumbers( 

774 p=prime1, 

775 q=prime2, 

776 dmp1=exponent1, 

777 dmq1=exponent2, 

778 iqmp=coefficient, 

779 d=privExp, 

780 public_numbers=pubNum, 

781 ) 

782 self.key = privNum.private_key(default_backend()) 

783 pubkey = self.key.public_key() 

784 

785 self.marker = "PRIVATE KEY" 

786 

787 # Lines below are only useful for the legacy part of pkcs1.py 

788 pubNum = pubkey.public_numbers() 

789 self._modulusLen = real_modulusLen 

790 self._modulus = pubNum.n 

791 self._pubExp = pubNum.e 

792 

793 self.pubkey = PubKeyRSA((pubNum.e, pubNum.n, real_modulusLen)) 

794 

795 def import_from_asn1pkt(self, privkey): 

796 modulus = privkey.modulus.val 

797 pubExp = privkey.publicExponent.val 

798 privExp = privkey.privateExponent.val 

799 prime1 = privkey.prime1.val 

800 prime2 = privkey.prime2.val 

801 exponent1 = privkey.exponent1.val 

802 exponent2 = privkey.exponent2.val 

803 coefficient = privkey.coefficient.val 

804 self.fill_and_store( 

805 modulus=modulus, 

806 pubExp=pubExp, 

807 privExp=privExp, 

808 prime1=prime1, 

809 prime2=prime2, 

810 exponent1=exponent1, 

811 exponent2=exponent2, 

812 coefficient=coefficient, 

813 ) 

814 

815 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

816 return self.pubkey.verify( 

817 msg=msg, 

818 sig=sig, 

819 t=t, 

820 h=h, 

821 mgf=mgf, 

822 L=L, 

823 ) 

824 

825 def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None): 

826 return _DecryptAndSignRSA.sign(self, data, t=t, h=h, mgf=mgf, L=L) 

827 

828 

829class PrivKeyECDSA(PrivKey): 

830 """ 

831 Wrapper for ECDSA keys based on SigningKey from ecdsa library. 

832 Use the 'key' attribute to access original object. 

833 """ 

834 

835 @crypto_validator 

836 def fill_and_store(self, curve=None): 

837 curve = curve or ec.SECP256R1 

838 self.key = ec.generate_private_key(curve(), default_backend()) 

839 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

840 self.marker = "EC PRIVATE KEY" 

841 

842 @crypto_validator 

843 def import_from_asn1pkt(self, privkey): 

844 self.key = serialization.load_der_private_key( 

845 bytes(privkey), None, backend=default_backend() 

846 ) 

847 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

848 self.marker = "EC PRIVATE KEY" 

849 

850 @crypto_validator 

851 def verify(self, msg, sig, h="sha256", **kwargs): 

852 return self.pubkey.verify(msg=msg, sig=sig, h=h, **kwargs) 

853 

854 @crypto_validator 

855 def sign(self, data, h="sha256", **kwargs): 

856 return self.key.sign(data, ec.ECDSA(_get_hash(h))) 

857 

858 

859class PrivKeyEdDSA(PrivKey): 

860 """ 

861 Wrapper for EdDSA keys 

862 Use the 'key' attribute to access original object. 

863 """ 

864 

865 @crypto_validator 

866 def fill_and_store(self, curve=None): 

867 curve = curve or x25519.X25519PrivateKey 

868 self.key = curve.generate() 

869 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

870 self.marker = "PRIVATE KEY" 

871 

872 @crypto_validator 

873 def import_from_asn1pkt(self, privkey): 

874 self.key = serialization.load_der_private_key( 

875 bytes(privkey), None, backend=default_backend() 

876 ) 

877 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

878 self.marker = "PRIVATE KEY" 

879 

880 @crypto_validator 

881 def verify(self, msg, sig, **kwargs): 

882 return self.pubkey.verify(msg=msg, sig=sig, **kwargs) 

883 

884 @crypto_validator 

885 def sign(self, data, **kwargs): 

886 return self.key.sign(data) 

887 

888 

889################ 

890# Certificates # 

891################ 

892 

893 

894class _CertMaker(_PKIObjMaker): 

895 """ 

896 Metaclass for Cert creation. It is not necessary as it was for the keys, 

897 but we reuse the model instead of creating redundant constructors. 

898 """ 

899 

900 def __call__(cls, cert_path=None, cryptography_obj=None): 

901 # This allows to import cryptography objects directly 

902 if cryptography_obj is not None: 

903 obj = _PKIObj( 

904 "DER", 

905 cryptography_obj.public_bytes( 

906 encoding=serialization.Encoding.DER, 

907 ), 

908 ) 

909 else: 

910 # Load from file 

911 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CERT_SIZE, "CERTIFICATE") 

912 obj.__class__ = Cert 

913 obj.marker = "CERTIFICATE" 

914 try: 

915 cert = X509_Cert(obj._der) 

916 except Exception: 

917 if conf.debug_dissector: 

918 raise 

919 raise Exception("Unable to import certificate") 

920 obj.import_from_asn1pkt(cert) 

921 return obj 

922 

923 

924def _get_cert_sig_hashname(cert): 

925 """ 

926 Return the hash associated with the signature algorithm of a certificate. 

927 """ 

928 tbsCert = cert.tbsCertificate 

929 sigAlg = tbsCert.signature 

930 return hash_by_oid[sigAlg.algorithm.val] 

931 

932 

933def _get_csr_sig_hashname(csr): 

934 """ 

935 Return the hash associated with the signature algorithm of a CSR. 

936 """ 

937 certReq = csr.certReq 

938 sigAlg = certReq.signatureAlgorithm 

939 return hash_by_oid[sigAlg.algorithm.val] 

940 

941 

942class Cert(metaclass=_CertMaker): 

943 """ 

944 Wrapper for the X509_Cert from layers/x509.py. 

945 Use the 'x509Cert' attribute to access original object. 

946 """ 

947 

948 def import_from_asn1pkt(self, cert): 

949 error_msg = "Unable to import certificate" 

950 

951 self.x509Cert = cert 

952 

953 tbsCert = cert.tbsCertificate 

954 

955 if tbsCert.version: 

956 self.version = tbsCert.version.val + 1 

957 else: 

958 self.version = 1 

959 self.serial = tbsCert.serialNumber.val 

960 self.sigAlg = tbsCert.signature.algorithm.oidname 

961 self.issuer = tbsCert.get_issuer() 

962 self.issuer_str = tbsCert.get_issuer_str() 

963 self.issuer_hash = hash(self.issuer_str) 

964 self.subject = tbsCert.get_subject() 

965 self.subject_str = tbsCert.get_subject_str() 

966 self.subject_hash = hash(self.subject_str) 

967 self.authorityKeyID = None 

968 

969 self.notBefore_str = tbsCert.validity.not_before.pretty_time 

970 try: 

971 self.notBefore = tbsCert.validity.not_before.datetime.timetuple() 

972 except ValueError: 

973 raise Exception(error_msg) 

974 self.notBefore_str_simple = time.strftime("%x", self.notBefore) 

975 

976 self.notAfter_str = tbsCert.validity.not_after.pretty_time 

977 try: 

978 self.notAfter = tbsCert.validity.not_after.datetime.timetuple() 

979 except ValueError: 

980 raise Exception(error_msg) 

981 self.notAfter_str_simple = time.strftime("%x", self.notAfter) 

982 

983 self.pubkey = PubKey(bytes(tbsCert.subjectPublicKeyInfo)) 

984 

985 if tbsCert.extensions: 

986 for extn in tbsCert.extensions: 

987 if extn.extnID.oidname == "basicConstraints": 

988 self.cA = False 

989 if extn.extnValue.cA: 

990 self.cA = not (extn.extnValue.cA.val == 0) 

991 elif extn.extnID.oidname == "keyUsage": 

992 self.keyUsage = extn.extnValue.get_keyUsage() 

993 elif extn.extnID.oidname == "extKeyUsage": 

994 self.extKeyUsage = extn.extnValue.get_extendedKeyUsage() 

995 elif extn.extnID.oidname == "authorityKeyIdentifier": 

996 self.authorityKeyID = extn.extnValue.keyIdentifier.val 

997 

998 self.signatureValue = bytes(cert.signatureValue) 

999 self.signatureLen = len(self.signatureValue) 

1000 

1001 def isIssuer(self, other): 

1002 """ 

1003 True if 'other' issued 'self', i.e.: 

1004 - self.issuer == other.subject 

1005 - self is signed by other 

1006 """ 

1007 if self.issuer_hash != other.subject_hash: 

1008 return False 

1009 return other.pubkey.verifyCert(self) 

1010 

1011 def isIssuerCert(self, other): 

1012 return self.isIssuer(other) 

1013 

1014 def isSelfSigned(self): 

1015 """ 

1016 Return True if the certificate is self-signed: 

1017 - issuer and subject are the same 

1018 - the signature of the certificate is valid. 

1019 """ 

1020 if self.issuer_hash == self.subject_hash: 

1021 return self.isIssuer(self) 

1022 return False 

1023 

1024 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 

1025 # no ECDSA *encryption* support, hence only RSA specific keywords here 

1026 return self.pubkey.encrypt(msg, t=t, h=h, mgf=mgf, L=L) 

1027 

1028 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

1029 return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L) 

1030 

1031 def getCertSignatureHash(self): 

1032 """ 

1033 Return the hash cryptography object used by the 'signatureAlgorithm' 

1034 """ 

1035 return _get_hash(_get_cert_sig_hashname(self)) 

1036 

1037 def setSubjectPublicKeyFromPrivateKey(self, key): 

1038 """ 

1039 Replace the subjectPublicKeyInfo of this certificate with the one from 

1040 the provided key. 

1041 """ 

1042 if isinstance(key, (PubKey, PrivKey)): 

1043 if isinstance(key, PrivKey): 

1044 pubkey = key.pubkey 

1045 else: 

1046 pubkey = key 

1047 self.tbsCertificate.subjectPublicKeyInfo = X509_SubjectPublicKeyInfo( 

1048 pubkey.der 

1049 ) 

1050 else: 

1051 raise ValueError("Unknown type 'key', should be PubKey or PrivKey") 

1052 

1053 def resignWith(self, key): 

1054 """ 

1055 Resign a certificate with a specific key 

1056 """ 

1057 self.import_from_asn1pkt(key.resignCert(self)) 

1058 

1059 def remainingDays(self, now=None): 

1060 """ 

1061 Based on the value of notAfter field, returns the number of 

1062 days the certificate will still be valid. The date used for the 

1063 comparison is the current and local date, as returned by 

1064 time.localtime(), except if 'now' argument is provided another 

1065 one. 'now' argument can be given as either a time tuple or a string 

1066 representing the date. Accepted format for the string version 

1067 are: 

1068 

1069 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT' 

1070 - '%m/%d/%y' e.g. '01/30/08' (less precise) 

1071 

1072 If the certificate is no more valid at the date considered, then 

1073 a negative value is returned representing the number of days 

1074 since it has expired. 

1075 

1076 The number of days is returned as a float to deal with the unlikely 

1077 case of certificates that are still just valid. 

1078 """ 

1079 if now is None: 

1080 now = time.localtime() 

1081 elif isinstance(now, str): 

1082 try: 

1083 if "/" in now: 

1084 now = time.strptime(now, "%m/%d/%y") 

1085 else: 

1086 now = time.strptime(now, "%b %d %H:%M:%S %Y %Z") 

1087 except Exception: 

1088 warning("Bad time string provided, will use localtime() instead.") 

1089 now = time.localtime() 

1090 

1091 now = time.mktime(now) 

1092 nft = time.mktime(self.notAfter) 

1093 diff = (nft - now) / (24.0 * 3600) 

1094 return diff 

1095 

1096 def isRevoked(self, crl_list): 

1097 """ 

1098 Given a list of trusted CRL (their signature has already been 

1099 verified with trusted anchors), this function returns True if 

1100 the certificate is marked as revoked by one of those CRL. 

1101 

1102 Note that if the Certificate was on hold in a previous CRL and 

1103 is now valid again in a new CRL and bot are in the list, it 

1104 will be considered revoked: this is because _all_ CRLs are 

1105 checked (not only the freshest) and revocation status is not 

1106 handled. 

1107 

1108 Also note that the check on the issuer is performed on the 

1109 Authority Key Identifier if available in _both_ the CRL and the 

1110 Cert. Otherwise, the issuers are simply compared. 

1111 """ 

1112 for c in crl_list: 

1113 if ( 

1114 self.authorityKeyID is not None 

1115 and c.authorityKeyID is not None 

1116 and self.authorityKeyID == c.authorityKeyID 

1117 ): 

1118 return self.serial in (x[0] for x in c.revoked_cert_serials) 

1119 elif self.issuer == c.issuer: 

1120 return self.serial in (x[0] for x in c.revoked_cert_serials) 

1121 return False 

1122 

1123 @property 

1124 def tbsCertificate(self): 

1125 return self.x509Cert.tbsCertificate 

1126 

1127 @property 

1128 def pem(self): 

1129 return der2pem(self.der, self.marker) 

1130 

1131 @property 

1132 def der(self): 

1133 return bytes(self.x509Cert) 

1134 

1135 @property 

1136 def pubKey(self): 

1137 warnings.warn( 

1138 "Cert.pubKey is deprecated and will be removed in a future version. " 

1139 "Use Cert.pubkey", 

1140 DeprecationWarning, 

1141 ) 

1142 return self.pubkey 

1143 

1144 @property 

1145 def extensions(self): 

1146 return self.tbsCertificate.extensions 

1147 

1148 def __eq__(self, other): 

1149 return self.der == other.der 

1150 

1151 def __hash__(self): 

1152 return hash(self.der) 

1153 

1154 def export(self, filename, fmt=None): 

1155 """ 

1156 Export certificate in 'fmt' format (DER or PEM) to file 'filename' 

1157 """ 

1158 if fmt is None: 

1159 if filename.endswith(".pem"): 

1160 fmt = "PEM" 

1161 else: 

1162 fmt = "DER" 

1163 with open(filename, "wb") as f: 

1164 if fmt == "DER": 

1165 return f.write(self.der) 

1166 elif fmt == "PEM": 

1167 return f.write(self.pem.encode()) 

1168 

1169 def show(self): 

1170 print("Serial: %s" % self.serial) 

1171 print("Issuer: " + self.issuer_str) 

1172 print("Subject: " + self.subject_str) 

1173 print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str)) 

1174 

1175 def __repr__(self): 

1176 return "[X.509 Cert. Subject:%s, Issuer:%s]" % ( 

1177 self.subject_str, 

1178 self.issuer_str, 

1179 ) 

1180 

1181 

1182################################ 

1183# Certificate Revocation Lists # 

1184################################ 

1185 

1186 

1187class _CRLMaker(_PKIObjMaker): 

1188 """ 

1189 Metaclass for CRL creation. It is not necessary as it was for the keys, 

1190 but we reuse the model instead of creating redundant constructors. 

1191 """ 

1192 

1193 def __call__(cls, cert_path): 

1194 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL") 

1195 obj.__class__ = CRL 

1196 try: 

1197 crl = X509_CRL(obj._der) 

1198 except Exception: 

1199 raise Exception("Unable to import CRL") 

1200 obj.import_from_asn1pkt(crl) 

1201 return obj 

1202 

1203 

1204class CRL(metaclass=_CRLMaker): 

1205 """ 

1206 Wrapper for the X509_CRL from layers/x509.py. 

1207 Use the 'x509CRL' attribute to access original object. 

1208 """ 

1209 

1210 def import_from_asn1pkt(self, crl): 

1211 error_msg = "Unable to import CRL" 

1212 

1213 self.x509CRL = crl 

1214 

1215 tbsCertList = crl.tbsCertList 

1216 self.tbsCertList = bytes(tbsCertList) 

1217 

1218 if tbsCertList.version: 

1219 self.version = tbsCertList.version.val + 1 

1220 else: 

1221 self.version = 1 

1222 self.sigAlg = tbsCertList.signature.algorithm.oidname 

1223 self.issuer = tbsCertList.get_issuer() 

1224 self.issuer_str = tbsCertList.get_issuer_str() 

1225 self.issuer_hash = hash(self.issuer_str) 

1226 

1227 self.lastUpdate_str = tbsCertList.this_update.pretty_time 

1228 lastUpdate = tbsCertList.this_update.val 

1229 if lastUpdate[-1] == "Z": 

1230 lastUpdate = lastUpdate[:-1] 

1231 try: 

1232 self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S") 

1233 except Exception: 

1234 raise Exception(error_msg) 

1235 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate) 

1236 

1237 self.nextUpdate = None 

1238 self.nextUpdate_str_simple = None 

1239 if tbsCertList.next_update: 

1240 self.nextUpdate_str = tbsCertList.next_update.pretty_time 

1241 nextUpdate = tbsCertList.next_update.val 

1242 if nextUpdate[-1] == "Z": 

1243 nextUpdate = nextUpdate[:-1] 

1244 try: 

1245 self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S") 

1246 except Exception: 

1247 raise Exception(error_msg) 

1248 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate) 

1249 

1250 if tbsCertList.crlExtensions: 

1251 for extension in tbsCertList.crlExtensions: 

1252 if extension.extnID.oidname == "cRLNumber": 

1253 self.number = extension.extnValue.cRLNumber.val 

1254 

1255 revoked = [] 

1256 if tbsCertList.revokedCertificates: 

1257 for cert in tbsCertList.revokedCertificates: 

1258 serial = cert.serialNumber.val 

1259 date = cert.revocationDate.val 

1260 if date[-1] == "Z": 

1261 date = date[:-1] 

1262 try: 

1263 time.strptime(date, "%y%m%d%H%M%S") 

1264 except Exception: 

1265 raise Exception(error_msg) 

1266 revoked.append((serial, date)) 

1267 self.revoked_cert_serials = revoked 

1268 

1269 self.signatureValue = bytes(crl.signatureValue) 

1270 self.signatureLen = len(self.signatureValue) 

1271 

1272 def isIssuer(self, other): 

1273 # This is exactly the same thing as in Cert method. 

1274 if self.issuer_hash != other.subject_hash: 

1275 return False 

1276 return other.pubkey.verifyCert(self) 

1277 

1278 def verify(self, anchors): 

1279 # Return True iff the CRL is signed by one of the provided anchors. 

1280 return any(self.isIssuer(a) for a in anchors) 

1281 

1282 def show(self): 

1283 print("Version: %d" % self.version) 

1284 print("sigAlg: " + self.sigAlg) 

1285 print("Issuer: " + self.issuer_str) 

1286 print("lastUpdate: %s" % self.lastUpdate_str) 

1287 print("nextUpdate: %s" % self.nextUpdate_str) 

1288 

1289 

1290############################### 

1291# Certificate Signing Request # 

1292############################### 

1293 

1294 

1295class _CSRMaker(_PKIObjMaker): 

1296 """ 

1297 Metaclass for CSR creation. It is not necessary as it was for the keys, 

1298 but we reuse the model instead of creating redundant constructors. 

1299 """ 

1300 

1301 def __call__(cls, cert_path): 

1302 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CSR_SIZE) 

1303 obj.__class__ = CSR 

1304 try: 

1305 # PKCS#10 format 

1306 csr = PKCS10_CertificationRequest(obj._der) 

1307 obj.marker = "NEW CERTIFICATE REQUEST" 

1308 obj.fmt = CSR.FORMAT.PKCS10 

1309 except Exception: 

1310 try: 

1311 # CMC format 

1312 csr = CMS_ContentInfo(obj._der) 

1313 obj.marker = "NEW CERTIFICATE REQUEST" 

1314 obj.fmt = CSR.FORMAT.CMC 

1315 except Exception: 

1316 raise Exception("Unable to import CSR") 

1317 

1318 obj.import_from_asn1pkt(csr) 

1319 return obj 

1320 

1321 

1322class CSR(metaclass=_CSRMaker): 

1323 """ 

1324 Wrapper for the CSR formats. 

1325 This can handle both PKCS#10 and CMC formats. 

1326 """ 

1327 

1328 class FORMAT(enum.Enum): 

1329 """ 

1330 The format used by the CSR. 

1331 """ 

1332 

1333 PKCS10 = "PKCS#10" 

1334 CMC = "CMC" 

1335 

1336 def import_from_asn1pkt(self, csr): 

1337 self.csr = csr 

1338 certReqInfo = self.certReq.certificationRequestInfo 

1339 

1340 # Subject 

1341 self.subject = certReqInfo.get_subject() 

1342 self.subject_str = certReqInfo.get_subject_str() 

1343 self.subject_hash = hash(self.subject_str) 

1344 

1345 # pubkey 

1346 self.pubkey = PubKey(bytes(certReqInfo.subjectPublicKeyInfo)) 

1347 

1348 # Get the "subjectKeyIdentifier" from the "extensionRequest" attribute 

1349 try: 

1350 extReq = next( 

1351 x.values[0].value 

1352 for x in certReqInfo.attributes 

1353 if x.type.val == "1.2.840.113549.1.9.14" # extKeyUsage 

1354 ) 

1355 self.sid = next( 

1356 x.extnValue.keyIdentifier 

1357 for x in extReq.extensions 

1358 if x.extnID.val == "2.5.29.14" # subjectKeyIdentifier 

1359 ) 

1360 except StopIteration: 

1361 self.sid = None 

1362 

1363 @property 

1364 def certReq(self): 

1365 csr = self.csr 

1366 

1367 if self.fmt == CSR.FORMAT.PKCS10: 

1368 return csr 

1369 elif self.fmt == CSR.FORMAT.CMC: 

1370 if ( 

1371 csr.contentType.oidname != "id-signedData" 

1372 or csr.content.encapContentInfo.eContentType.oidname != "id-cct-PKIData" 

1373 ): 

1374 raise ValueError("Invalid CMC wrapping !") 

1375 req = csr.content.encapContentInfo.eContent.reqSequence[0] 

1376 return req.request.certificationRequest 

1377 else: 

1378 raise ValueError("Invalid CSR format !") 

1379 

1380 @property 

1381 def pem(self): 

1382 return der2pem(self.der, self.marker) 

1383 

1384 @property 

1385 def der(self): 

1386 return bytes(self.csr) 

1387 

1388 def __eq__(self, other): 

1389 return self.der == other.der 

1390 

1391 def __hash__(self): 

1392 return hash(self.der) 

1393 

1394 def isIssuer(self, other): 

1395 return other.sid == self.sid 

1396 

1397 def isSelfSigned(self): 

1398 return True 

1399 

1400 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

1401 return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L) 

1402 

1403 def export(self, filename, fmt=None): 

1404 """ 

1405 Export certificate in 'fmt' format (DER or PEM) to file 'filename' 

1406 """ 

1407 if fmt is None: 

1408 if filename.endswith(".pem"): 

1409 fmt = "PEM" 

1410 else: 

1411 fmt = "DER" 

1412 with open(filename, "wb") as f: 

1413 if fmt == "DER": 

1414 return f.write(self.der) 

1415 elif fmt == "PEM": 

1416 return f.write(self.pem.encode()) 

1417 

1418 def show(self): 

1419 certReqInfo = self.certReq.certificationRequestInfo 

1420 

1421 print("Subject: " + self.subject_str) 

1422 print("Attributes:") 

1423 for attr in certReqInfo.attributes: 

1424 print(" - %s" % attr.type.oidname) 

1425 

1426 def verifySelf(self) -> bool: 

1427 """ 

1428 Verify the signatures of the CSR 

1429 """ 

1430 if self.fmt == self.FORMAT.CMC: 

1431 try: 

1432 cms_engine = CMS_Engine([self]) 

1433 cms_engine.verify(self.csr) 

1434 return self.pubkey.verifyCsr(self) 

1435 except ValueError: 

1436 return False 

1437 elif self.fmt == self.FORMAT.PKCS10: 

1438 return self.pubkey.verifyCsr(self) 

1439 else: 

1440 return False 

1441 

1442 def __repr__(self): 

1443 return "[CSR Format: %s, Subject:%s, Verified: %s]" % ( 

1444 self.fmt.value, 

1445 self.subject_str, 

1446 self.verifySelf(), 

1447 ) 

1448 

1449 

1450#################### 

1451# Certificate list # 

1452#################### 

1453 

1454 

1455class CertList(list): 

1456 """ 

1457 An object that can store a list of Cert objects, load them and export them 

1458 into DER/PEM format. 

1459 """ 

1460 

1461 def __init__( 

1462 self, 

1463 certList: Union[Self, List[Cert], List[CSR], Cert, str], 

1464 ): 

1465 """ 

1466 Construct a list of certificates/CRLs to be used as list of ROOT certificates. 

1467 """ 

1468 # Parse the certificate list / CA 

1469 if isinstance(certList, str): 

1470 # It's a path. First get the _PKIObj 

1471 obj = _PKIObjMaker.__call__( 

1472 CertList, certList, _MAX_CERT_SIZE, "CERTIFICATE" 

1473 ) 

1474 

1475 # Then parse the der until there's nothing left 

1476 certList = [] 

1477 payload = obj._der 

1478 while payload: 

1479 cert = X509_Cert(payload) 

1480 if conf.raw_layer in cert.payload: 

1481 payload = cert.payload.load 

1482 else: 

1483 payload = None 

1484 cert.remove_payload() 

1485 certList.append(Cert(cert)) 

1486 

1487 self.frmt = obj.frmt 

1488 elif isinstance(certList, Cert): 

1489 certList = [certList] 

1490 self.frmt = "PEM" 

1491 else: 

1492 self.frmt = "PEM" 

1493 

1494 super(CertList, self).__init__(certList) 

1495 

1496 def findCertBySid(self, sid): 

1497 """ 

1498 Find a certificate in the list by SubjectIDentifier. 

1499 """ 

1500 for cert in self: 

1501 if isinstance(cert, Cert) and isinstance(sid, CMS_IssuerAndSerialNumber): 

1502 if cert.issuer == sid.get_issuer(): 

1503 return cert 

1504 elif isinstance(cert, CSR) and isinstance(sid, CMS_SubjectKeyIdentifier): 

1505 if cert.sid == sid.sid: 

1506 return cert 

1507 raise KeyError("Certificate not found !") 

1508 

1509 def export(self, filename, fmt=None): 

1510 """ 

1511 Export a list of certificates 'fmt' format (DER or PEM) to file 'filename' 

1512 """ 

1513 if fmt is None: 

1514 if filename.endswith(".pem"): 

1515 fmt = "PEM" 

1516 else: 

1517 fmt = "DER" 

1518 with open(filename, "wb") as f: 

1519 if fmt == "DER": 

1520 return f.write(self.der) 

1521 elif fmt == "PEM": 

1522 return f.write(self.pem.encode()) 

1523 

1524 @property 

1525 def der(self): 

1526 return b"".join(x.der for x in self) 

1527 

1528 @property 

1529 def pem(self): 

1530 return "".join(x.pem for x in self) 

1531 

1532 def __repr__(self): 

1533 return "<CertList %s certificates>" % (len(self),) 

1534 

1535 def show(self): 

1536 for i, c in enumerate(self): 

1537 print(conf.color_theme.id(i, fmt="%04i"), end=" ") 

1538 print(repr(c)) 

1539 

1540 

1541###################### 

1542# Certificate chains # 

1543###################### 

1544 

1545 

1546class CertTree(CertList): 

1547 """ 

1548 An extension to CertList that additionally has a list of ROOT CAs 

1549 that are trusted. 

1550 

1551 Example:: 

1552 

1553 >>> tree = CertTree("ca_chain.pem") 

1554 >>> tree.show() 

1555 /CN=DOMAIN-DC1-CA/dc=DOMAIN [Self Signed] 

1556 /CN=Administrator/dc=DOMAIN [Not Self Signed] 

1557 """ 

1558 

1559 __slots__ = ["frmt", "rootCAs"] 

1560 

1561 def __init__( 

1562 self, 

1563 certList: Union[List[Cert], CertList, str], 

1564 rootCAs: Union[List[Cert], CertList, Cert, str, None] = None, 

1565 ): 

1566 """ 

1567 Construct a chain of certificates that follows issuer/subject matching and 

1568 respects signature validity. 

1569 

1570 Note that we do not check AKID/{SKID/issuer/serial} matching, 

1571 nor the presence of keyCertSign in keyUsage extension (if present). 

1572 

1573 :param certList: a list of Cert/CRL objects (or path to PEM/DER file containing 

1574 multiple certs/CRL) to try to chain. 

1575 :param rootCAs: (optional) a list of certificates to trust. If not provided, 

1576 trusts any self-signed certificates from the certList. 

1577 """ 

1578 # Parse the certificate list 

1579 certList = CertList(certList) 

1580 

1581 # Find the ROOT CAs if store isn't specified 

1582 if not rootCAs: 

1583 # Build cert store. 

1584 self.rootCAs = CertList([x for x in certList if x.isSelfSigned()]) 

1585 # And remove those certs from the list 

1586 for cert in self.rootCAs: 

1587 certList.remove(cert) 

1588 else: 

1589 # Store cert store. 

1590 self.rootCAs = CertList(rootCAs) 

1591 # And remove those certs from the list if present (remove dups) 

1592 for cert in self.rootCAs: 

1593 if cert in certList: 

1594 certList.remove(cert) 

1595 

1596 # Append our root CAs to the certList 

1597 certList.extend(self.rootCAs) 

1598 

1599 # Super instantiate 

1600 super(CertTree, self).__init__(certList) 

1601 

1602 @property 

1603 def tree(self): 

1604 """ 

1605 Get a tree-like object of the certificate list 

1606 """ 

1607 # We store the tree object as a dictionary that contains children. 

1608 tree = [(x, []) for x in self.rootCAs] 

1609 

1610 # We'll empty this list eventually 

1611 certList = list(self) 

1612 

1613 # We make a list of certificates we have to search children for, and iterate 

1614 # through it until it's empty. 

1615 todo = list(tree) 

1616 

1617 # Iterate 

1618 while todo: 

1619 cert, children = todo.pop() 

1620 for c in certList: 

1621 # Check if this certificate matches the one we're looking at 

1622 if c.isIssuer(cert) and c != cert: 

1623 item = (c, []) 

1624 children.append(item) 

1625 certList.remove(c) 

1626 todo.append(item) 

1627 

1628 return tree 

1629 

1630 def getchain(self, cert): 

1631 """ 

1632 Return a chain of certificate that points from a ROOT CA to a certificate. 

1633 """ 

1634 

1635 def _rec_getchain(chain, curtree): 

1636 # See if an element of the current tree signs the cert, if so add it to 

1637 # the chain, else recurse. 

1638 for c, subtree in curtree: 

1639 curchain = chain + [c] 

1640 # If 'cert' is issued by c 

1641 if cert.isIssuer(c) or c == cert: 

1642 # Final node of the chain ! 

1643 # (add the final cert if not self signed) 

1644 if c != cert: 

1645 curchain += [cert] 

1646 return curchain 

1647 else: 

1648 # Not the final node of the chain ! Recurse. 

1649 curchain = _rec_getchain(curchain, subtree) 

1650 if curchain: 

1651 return curchain 

1652 return None 

1653 

1654 chain = _rec_getchain([], self.tree) 

1655 if chain is not None: 

1656 # We add the first certificate to the ROOT in all cases 

1657 return CertTree(chain, [chain[0]]) 

1658 else: 

1659 return None 

1660 

1661 def verify(self, cert): 

1662 """ 

1663 Verify that a certificate is properly signed. 

1664 """ 

1665 # Check that we can find a chain to this certificate 

1666 if not self.getchain(cert): 

1667 raise ValueError("Certificate verification failed !") 

1668 

1669 def show(self, ret: bool = False): 

1670 """ 

1671 Return the CertTree as a string certificate tree 

1672 """ 

1673 

1674 def _rec_show(c, children, lvl=0): 

1675 s = "" 

1676 # Process the current CA 

1677 if c: 

1678 if not c.isSelfSigned(): 

1679 s += "%s [Not Self Signed]\n" % c.subject_str 

1680 else: 

1681 s += "%s [Self Signed]\n" % c.subject_str 

1682 s = lvl * " " + s 

1683 lvl += 1 

1684 # Process all sub-CAs at a lower level 

1685 for child, subchildren in children: 

1686 s += _rec_show(child, subchildren, lvl=lvl) 

1687 return s 

1688 

1689 showed = _rec_show(None, self.tree) 

1690 if ret: 

1691 return showed 

1692 else: 

1693 print(showed) 

1694 

1695 def __repr__(self): 

1696 return "<CertTree %s certificates (%s ROOT CA)>" % ( 

1697 len(self), 

1698 len(self.rootCAs), 

1699 ) 

1700 

1701 

1702####### 

1703# CMS # 

1704####### 

1705 

1706# RFC3852 

1707 

1708 

1709class CMS_Engine: 

1710 """ 

1711 A utility class to perform CMS/PKCS7 operations, as specified by RFC3852. 

1712 

1713 :param store: a ROOT CA certificate list to trust. 

1714 :param crls: a list of CRLs to include. This is currently not checked. 

1715 """ 

1716 

1717 def __init__( 

1718 self, 

1719 store: CertList, 

1720 crls: List[X509_CRL] = [], 

1721 ): 

1722 self.store = store 

1723 self.crls = crls 

1724 

1725 def _get_algorithms(self, key: PrivKey, h="sha256") -> ASN1_OID: 

1726 """ 

1727 Get the algorithms matching a private key 

1728 """ 

1729 if isinstance(key, PrivKeyRSA): 

1730 # RFC3370 sect 3.2 

1731 return ( 

1732 ASN1_OID("rsaEncryption"), 

1733 _get_hash(h), 

1734 h, 

1735 ) 

1736 elif isinstance(key, PrivKeyECDSA): 

1737 # RFC5753 sect 2.1.1 

1738 if h == "sha1": 

1739 return ( 

1740 ASN1_OID("ecdsa-with-SHA1"), 

1741 hashes.SHA1(), 

1742 "sha1", 

1743 ) 

1744 elif h == "sha224": 

1745 return ( 

1746 ASN1_OID("ecdsa-with-SHA224"), 

1747 hashes.SHA224(), 

1748 "sha224", 

1749 ) 

1750 elif h == "sha256": 

1751 return ( 

1752 ASN1_OID("ecdsa-with-SHA256"), 

1753 hashes.SHA256(), 

1754 "sha256", 

1755 ) 

1756 elif h == "sha384": 

1757 return ( 

1758 ASN1_OID("ecdsa-with-SHA384"), 

1759 hashes.SHA384(), 

1760 "sha384", 

1761 ) 

1762 elif h == "sha512": 

1763 return ( 

1764 ASN1_OID("ecdsa-with-SHA512"), 

1765 hashes.SHA512(), 

1766 "sha512", 

1767 ) 

1768 else: 

1769 raise ValueError("Unknown hash for private key !") 

1770 elif isinstance(key, PrivKeyEdDSA): 

1771 # RFC8419 sect 2.3 

1772 if isinstance(key.key, x25519.X25519PrivateKey): 

1773 return ( 

1774 ASN1_OID("Ed25519"), 

1775 hashes.SHA512(), 

1776 "sha512", 

1777 ) 

1778 elif isinstance(key.key, x448.X448PrivateKey): 

1779 return ( 

1780 ASN1_OID("Ed448"), 

1781 hashes.SHAKE256(64), 

1782 "shake256", 

1783 ) 

1784 else: 

1785 raise ValueError("Unknown private key type !") 

1786 

1787 def sign( 

1788 self, 

1789 message: Union[bytes, Packet], 

1790 eContentType: ASN1_OID, 

1791 cert: Cert, 

1792 key: PrivKey, 

1793 dhash: Optional[str] = "sha256", 

1794 ): 

1795 """ 

1796 Sign a message using CMS. 

1797 

1798 :param message: the inner content to sign. 

1799 :param eContentType: the OID of the inner content. 

1800 :param cert: the certificate whose key to use use for signing. 

1801 :param key: the private key to use for signing. 

1802 :param dhash: the hash to use for message digest (ECDSA only). 

1803 

1804 We currently only support X.509 certificates ! 

1805 """ 

1806 sigalg, cdhash, dhash = self._get_algorithms(key, h=dhash) 

1807 

1808 # RFC3852 5.4. Message Digest Calculation Process 

1809 hash = hashes.Hash(cdhash) 

1810 hash.update(bytes(message)) 

1811 hashed_message = hash.finalize() 

1812 

1813 # RFC3852 5.5. Signature Generation Process 

1814 signerInfo = CMS_SignerInfo( 

1815 version=1, 

1816 sid=CMS_IssuerAndSerialNumber( 

1817 issuer=cert.tbsCertificate.issuer, 

1818 serialNumber=cert.tbsCertificate.serialNumber, 

1819 ), 

1820 digestAlgorithm=X509_AlgorithmIdentifier( 

1821 algorithm=ASN1_OID(dhash), 

1822 parameters=ASN1_NULL(0), 

1823 ), 

1824 signedAttrs=[ 

1825 X509_Attribute( 

1826 type=ASN1_OID("contentType"), 

1827 values=[ 

1828 X509_AttributeValue(value=eContentType), 

1829 ], 

1830 ), 

1831 X509_Attribute( 

1832 type=ASN1_OID("messageDigest"), 

1833 # "A message-digest attribute MUST have a single attribute value" 

1834 values=[ 

1835 X509_AttributeValue(value=ASN1_STRING(hashed_message)), 

1836 ], 

1837 ), 

1838 ], 

1839 signatureAlgorithm=X509_AlgorithmIdentifier( 

1840 algorithm=sigalg, 

1841 parameters=ASN1_NULL(0), 

1842 ), 

1843 ) 

1844 signerInfo.signature = ASN1_STRING( 

1845 key.sign( 

1846 bytes( 

1847 CMS_SignedAttrsForSignature( 

1848 signedAttrs=signerInfo.signedAttrs, 

1849 ) 

1850 ), 

1851 h=dhash, 

1852 ) 

1853 ) 

1854 

1855 # Build a chain of X509_Cert to ship (but skip the ROOT certificate) 

1856 certTree = CertTree(cert, self.store) 

1857 certificates = [x.x509Cert for x in certTree if not x.isSelfSigned()] 

1858 

1859 # Build final structure 

1860 return CMS_ContentInfo( 

1861 contentType=ASN1_OID("id-signedData"), 

1862 content=CMS_SignedData( 

1863 version=3 if certificates else 1, 

1864 digestAlgorithms=X509_AlgorithmIdentifier( 

1865 algorithm=ASN1_OID(dhash), 

1866 parameters=ASN1_NULL(0), 

1867 ), 

1868 encapContentInfo=CMS_EncapsulatedContentInfo( 

1869 eContentType=eContentType, 

1870 eContent=message, 

1871 ), 

1872 certificates=( 

1873 [CMS_CertificateChoices(certificate=cert) for cert in certificates] 

1874 if certificates 

1875 else None 

1876 ), 

1877 crls=( 

1878 [CMS_RevocationInfoChoice(crl=crl) for crl in self.crls] 

1879 if self.crls 

1880 else None 

1881 ), 

1882 signerInfos=[ 

1883 signerInfo, 

1884 ], 

1885 ), 

1886 ) 

1887 

1888 def verify( 

1889 self, 

1890 contentInfo: CMS_ContentInfo, 

1891 eContentType: Optional[ASN1_OID] = None, 

1892 eContent: Optional[bytes] = None, 

1893 no_verify_cert: bool = False, 

1894 ): 

1895 """ 

1896 Verify a CMS message against the list of trusted certificates, 

1897 and return the unpacked message if the verification succeeds. 

1898 

1899 :param contentInfo: the ContentInfo whose signature to verify 

1900 :param eContentType: if provided, verifies that the content type is valid 

1901 :param eContent: in PKCS 7.1, provide the content to verify 

1902 :param no_verify_cert: do not check the remote certificate (unsafe) 

1903 """ 

1904 if contentInfo.contentType.oidname != "id-signedData": 

1905 raise ValueError("ContentInfo isn't signed !") 

1906 

1907 signeddata = contentInfo.content 

1908 

1909 # Build the certificate chain 

1910 certificates = [] 

1911 if signeddata.certificates: 

1912 certificates = [Cert(x.certificate) for x in signeddata.certificates] 

1913 certTree = CertTree(certificates, self.store) 

1914 

1915 # Check there's at least one signature 

1916 if not signeddata.signerInfos: 

1917 raise ValueError("ContentInfo contained no signature !") 

1918 

1919 # Check all signatures 

1920 for signerInfo in signeddata.signerInfos: 

1921 sigh = hash_by_oid[signerInfo.signatureAlgorithm.algorithm.val] 

1922 

1923 # Find certificate in the chain that did this 

1924 cert: Cert = certTree.findCertBySid(signerInfo.sid) 

1925 

1926 # Verify certificate signature 

1927 if not no_verify_cert: 

1928 certTree.verify(cert) 

1929 

1930 # Verify the message hash 

1931 if signerInfo.signedAttrs: 

1932 # Verify the contentType 

1933 try: 

1934 contentType = next( 

1935 x.values[0].value 

1936 for x in signerInfo.signedAttrs 

1937 if x.type.oidname == "contentType" 

1938 ) 

1939 

1940 if contentType != signeddata.encapContentInfo.eContentType: 

1941 raise ValueError( 

1942 "Inconsistent 'contentType' was detected in packet !" 

1943 ) 

1944 

1945 if eContentType is not None and eContentType != contentType: 

1946 raise ValueError( 

1947 "Expected '%s' but got '%s' contentType !" 

1948 % ( 

1949 eContentType, 

1950 contentType, 

1951 ) 

1952 ) 

1953 except StopIteration: 

1954 raise ValueError("Missing contentType in signedAttrs !") 

1955 

1956 # Verify the messageDigest value 

1957 try: 

1958 # "A message-digest attribute MUST have a single attribute value" 

1959 messageDigest = next( 

1960 x.values[0].value 

1961 for x in signerInfo.signedAttrs 

1962 if x.type.oidname == "messageDigest" 

1963 ) 

1964 

1965 if signeddata.encapContentInfo.eContent is not None: 

1966 eContent = bytes(signeddata.encapContentInfo.eContent) 

1967 elif eContent is None: 

1968 raise ValueError("No eContent was provided !") 

1969 

1970 # Re-calculate hash 

1971 h = signerInfo.digestAlgorithm.algorithm.oidname 

1972 hash = hashes.Hash(_get_hash(h)) 

1973 hash.update(eContent) 

1974 hashed_message = hash.finalize() 

1975 

1976 if hashed_message != messageDigest: 

1977 raise ValueError("Invalid messageDigest value !") 

1978 except StopIteration: 

1979 raise ValueError("Missing messageDigest in signedAttrs !") 

1980 

1981 # Verify the signature 

1982 cert.verify( 

1983 msg=bytes( 

1984 CMS_SignedAttrsForSignature( 

1985 signedAttrs=signerInfo.signedAttrs, 

1986 ) 

1987 ), 

1988 sig=signerInfo.signature.val, 

1989 h=sigh, 

1990 ) 

1991 else: 

1992 cert.verify( 

1993 msg=bytes(signeddata.encapContentInfo), 

1994 sig=signerInfo.signature.val, 

1995 h=sigh, 

1996 ) 

1997 

1998 # Return the content 

1999 return signeddata.encapContentInfo.eContent