Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/tls/cert.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

893 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net> 

5# <arno@natisbad.org> 

6# 2015, 2016, 2017 Maxence Tury <maxence.tury@ssi.gouv.fr> 

7# 2022-2025 Gabriel Potter 

8 

9r""" 

10High-level methods for PKI objects (X.509 certificates, CRLs, CSR, Keys, CMS). 

11Supported keys include RSA, ECDSA and EdDSA. 

12 

13The classes below are wrappers for the ASN.1 objects defined in x509.py. 

14 

15Example 1: Certificate & Private key 

16____________________________________ 

17 

18For instance, here is what you could do in order to modify the subject public 

19key info of a 'cert' and then resign it with whatever 'key':: 

20 

21 >>> from scapy.layers.tls.cert import * 

22 >>> cert = Cert("cert.der") 

23 >>> k = PrivKeyRSA() # generate a private key 

24 >>> cert.setSubjectPublicKeyFromPrivateKey(k) 

25 >>> cert.resignWith(k) 

26 >>> cert.export("newcert.pem") 

27 >>> k.export("mykey.pem") 

28 

29One could also edit arguments like the serial number, as such:: 

30 

31 >>> from scapy.layers.tls.cert import * 

32 >>> c = Cert("mycert.pem") 

33 >>> c.tbsCertificate.serialNumber = 0x4B1D 

34 >>> k = PrivKey("mykey.pem") # import an existing private key 

35 >>> c.resignWith(k) 

36 >>> c.export("newcert.pem") 

37 

38To export the public key of a private key:: 

39 

40 >>> k = PrivKey("mykey.pem") 

41 >>> k.pubkey.export("mypubkey.pem") 

42 

43Example 2: CertList and CertTree 

44________________________________ 

45 

46Load a .pem file that contains multiple certificates:: 

47 

48 >>> l = CertList("ca_chain.pem") 

49 >>> l.show() 

50 0000 [X.509 Cert Subject:/C=FR/OU=Scapy Test PKI/CN=Scapy Test CA...] 

51 0001 [X.509 Cert Subject:/C=FR/OU=Scapy Test PKI/CN=Scapy Test Client...] 

52 

53Use 'CertTree' to organize the certificates in a tree:: 

54 

55 >>> tree = CertTree("ca_chain.pem") # or tree = CertTree(l) 

56 >>> tree.show() 

57 /C=Ulaanbaatar/OU=Scapy Test PKI/CN=Scapy Test CA [Self Signed] 

58 /C=FR/OU=Scapy Test PKI/CN=Scapy Test Client [Not Self Signed] 

59 

60Example 3: Certificate Signing Request (CSR) 

61____________________________________________ 

62 

63Scapy's :py:class:`~scapy.layers.tls.cert.CSR` class supports both PKCS#10 and CMC 

64formats. 

65 

66Load and display a CSR:: 

67 

68 >>> csr = CSR("cert.req") 

69 >>> csr 

70 [CSR Format: CMC, Subject:/O=TestOrg/CN=TestCN, Verified: True] 

71 >>> csr.certReq.show() 

72 ###[ PKCS10_CertificationRequest ]### 

73 \certificationRequestInfo\ 

74 |###[ PKCS10_CertificationRequestInfo ]### 

75 | version = 0x0 <ASN1_INTEGER[0] 

76 | \subject \ 

77 | |###[ X509_RDN ]### 

78 | | \rdn \ 

79 | | |###[ X509_AttributeTypeAndValue ]### 

80 | | | type = <ASN1_OID['organizationName']> 

81 | | | value = <ASN1_UTF8_STRING[b'TestOrg']> 

82 [...] 

83 

84Get its public key and verify its signature:: 

85 

86 >>> csr.pubkey 

87 <scapy.layers.tls.cert.PubKeyRSA at 0x7f3481149310> 

88 >>> csr.verifySelf() 

89 True 

90 

91No need for obnoxious openssl tweaking anymore. :) 

92""" 

93 

94import base64 

95import enum 

96import os 

97import time 

98import warnings 

99 

100from scapy.config import conf, crypto_validator 

101from scapy.compat import Self 

102from scapy.error import warning 

103from scapy.utils import binrepr 

104from scapy.asn1.asn1 import ( 

105 ASN1_BIT_STRING, 

106 ASN1_NULL, 

107 ASN1_OID, 

108 ASN1_STRING, 

109) 

110from scapy.asn1.mib import hash_by_oid 

111from scapy.packet import Packet 

112from scapy.layers.x509 import ( 

113 CMS_CertificateChoices, 

114 CMS_ContentInfo, 

115 CMS_EncapsulatedContentInfo, 

116 CMS_IssuerAndSerialNumber, 

117 CMS_RevocationInfoChoice, 

118 CMS_SignedAttrsForSignature, 

119 CMS_SignedData, 

120 CMS_SignerInfo, 

121 CMS_SubjectKeyIdentifier, 

122 ECDSAPrivateKey_OpenSSL, 

123 ECDSAPrivateKey, 

124 ECDSAPublicKey, 

125 EdDSAPrivateKey, 

126 EdDSAPublicKey, 

127 PKCS10_CertificationRequest, 

128 RSAPrivateKey_OpenSSL, 

129 RSAPrivateKey, 

130 RSAPublicKey, 

131 X509_AlgorithmIdentifier, 

132 X509_Attribute, 

133 X509_AttributeValue, 

134 X509_Cert, 

135 X509_CRL, 

136 X509_SubjectPublicKeyInfo, 

137) 

138from scapy.layers.tls.crypto.pkcs1 import ( 

139 _DecryptAndSignRSA, 

140 _EncryptAndVerifyRSA, 

141 _get_hash, 

142 pkcs_os2ip, 

143) 

144from scapy.compat import bytes_encode 

145 

146# Typing imports 

147from typing import ( 

148 List, 

149 Optional, 

150 Union, 

151) 

152 

153if conf.crypto_valid: 

154 from cryptography.exceptions import InvalidSignature 

155 from cryptography.hazmat.backends import default_backend 

156 from cryptography.hazmat.primitives import hashes 

157 from cryptography.hazmat.primitives import serialization 

158 from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519 

159 

160 # cryptography raised the minimum RSA key length to 1024 in 43.0+ 

161 # https://github.com/pyca/cryptography/pull/10278 

162 # but we need still 512 for EXPORT40 ciphers (yes EXPORT is terrible) 

163 # https://datatracker.ietf.org/doc/html/rfc2246#autoid-66 

164 # The following detects the change and hacks around it using the backend 

165 

166 try: 

167 rsa.generate_private_key(public_exponent=65537, key_size=512) 

168 _RSA_512_SUPPORTED = True 

169 except ValueError: 

170 # cryptography > 43.0 

171 _RSA_512_SUPPORTED = False 

172 from cryptography.hazmat.primitives.asymmetric.rsa import rust_openssl 

173 

174 

175# Maximum allowed size in bytes for a certificate file, to avoid 

176# loading huge file when importing a cert 

177_MAX_KEY_SIZE = 50 * 1024 

178_MAX_CERT_SIZE = 50 * 1024 

179_MAX_CRL_SIZE = 10 * 1024 * 1024 # some are that big 

180_MAX_CSR_SIZE = 50 * 1024 

181 

182 

183##################################################################### 

184# Some helpers 

185##################################################################### 

186 

187 

188@conf.commands.register 

189def der2pem(der_string, obj="UNKNOWN"): 

190 """Convert DER octet string to PEM format (with optional header)""" 

191 # Encode a byte string in PEM format. Header advertises <obj> type. 

192 pem_string = "-----BEGIN %s-----\n" % obj 

193 base64_string = base64.b64encode(der_string).decode() 

194 chunks = [base64_string[i : i + 64] for i in range(0, len(base64_string), 64)] 

195 pem_string += "\n".join(chunks) 

196 pem_string += "\n-----END %s-----\n" % obj 

197 return pem_string 

198 

199 

200@conf.commands.register 

201def pem2der(pem_string): 

202 """Convert PEM string to DER format""" 

203 # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'. 

204 pem_string = pem_string.replace(b"\r", b"") 

205 first_idx = pem_string.find(b"-----\n") + 6 

206 if pem_string.find(b"-----BEGIN", first_idx) != -1: 

207 raise Exception("pem2der() expects only one PEM-encoded object") 

208 last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----")) 

209 base64_string = pem_string[first_idx:last_idx] 

210 base64_string.replace(b"\n", b"") 

211 der_string = base64.b64decode(base64_string) 

212 return der_string 

213 

214 

215def split_pem(s): 

216 """ 

217 Split PEM objects. Useful to process concatenated certificates. 

218 """ 

219 pem_strings = [] 

220 while s != b"": 

221 start_idx = s.find(b"-----BEGIN") 

222 if start_idx == -1: 

223 break 

224 end_idx = s.find(b"-----END") 

225 if end_idx == -1: 

226 raise Exception("Invalid PEM object (missing END tag)") 

227 end_idx = s.find(b"\n", end_idx) + 1 

228 if end_idx == 0: 

229 # There is no final \n 

230 end_idx = len(s) 

231 pem_strings.append(s[start_idx:end_idx]) 

232 s = s[end_idx:] 

233 return pem_strings 

234 

235 

236class _PKIObj(object): 

237 def __init__(self, frmt, der): 

238 self.frmt = frmt 

239 self._der = der 

240 

241 

242class _PKIObjMaker(type): 

243 def __call__(cls, obj_path, obj_max_size, pem_marker=None): 

244 # This enables transparent DER and PEM-encoded data imports. 

245 # Note that when importing a PEM file with multiple objects (like ECDSA 

246 # private keys output by openssl), it will concatenate every object in 

247 # order to create a 'der' attribute. When converting a 'multi' DER file 

248 # into a PEM file, though, the PEM attribute will not be valid, 

249 # because we do not try to identify the class of each object. 

250 error_msg = "Unable to import data" 

251 

252 if obj_path is None: 

253 raise Exception(error_msg) 

254 obj_path = bytes_encode(obj_path) 

255 

256 if (b"\x00" not in obj_path) and os.path.isfile(obj_path): 

257 _size = os.path.getsize(obj_path) 

258 if _size > obj_max_size: 

259 raise Exception(error_msg) 

260 try: 

261 with open(obj_path, "rb") as f: 

262 _raw = f.read() 

263 except Exception: 

264 raise Exception(error_msg) 

265 else: 

266 _raw = obj_path 

267 

268 try: 

269 if b"-----BEGIN" in _raw: 

270 frmt = "PEM" 

271 pem = _raw 

272 der_list = split_pem(pem) 

273 der = b"".join(map(pem2der, der_list)) 

274 else: 

275 frmt = "DER" 

276 der = _raw 

277 except Exception: 

278 raise Exception(error_msg) 

279 

280 p = _PKIObj(frmt, der) 

281 return p 

282 

283 

284##################################################################### 

285# PKI objects wrappers 

286##################################################################### 

287 

288############### 

289# Public Keys # 

290############### 

291 

292 

293class _PubKeyFactory(_PKIObjMaker): 

294 """ 

295 Metaclass for PubKey creation. 

296 It casts the appropriate class on the fly, then fills in 

297 the appropriate attributes with import_from_asn1pkt() submethod. 

298 """ 

299 

300 def __call__(cls, key_path=None, cryptography_obj=None): 

301 # This allows to import cryptography objects directly 

302 if cryptography_obj is not None: 

303 obj = type.__call__(cls) 

304 obj.__class__ = cls 

305 obj.frmt = "original" 

306 obj.marker = "PUBLIC KEY" 

307 obj.pubkey = cryptography_obj 

308 return obj 

309 

310 if key_path is None: 

311 obj = type.__call__(cls) 

312 if cls is PubKey: 

313 cls = PubKeyRSA 

314 obj.__class__ = cls 

315 obj.frmt = "original" 

316 obj.fill_and_store() 

317 return obj 

318 

319 # This deals with the rare RSA 'kx export' call. 

320 if isinstance(key_path, tuple): 

321 obj = type.__call__(cls) 

322 obj.__class__ = PubKeyRSA 

323 obj.frmt = "tuple" 

324 obj.import_from_tuple(key_path) 

325 return obj 

326 

327 # Now for the usual calls, key_path may be the path to either: 

328 # _an X509_SubjectPublicKeyInfo, as processed by openssl; 

329 # _an RSAPublicKey; 

330 # _an ECDSAPublicKey; 

331 # _an EdDSAPublicKey. 

332 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 

333 try: 

334 spki = X509_SubjectPublicKeyInfo(obj._der) 

335 pubkey = spki.subjectPublicKey 

336 if isinstance(pubkey, RSAPublicKey): 

337 obj.__class__ = PubKeyRSA 

338 obj.import_from_asn1pkt(pubkey) 

339 elif isinstance(pubkey, ECDSAPublicKey): 

340 obj.__class__ = PubKeyECDSA 

341 obj.import_from_der(obj._der) 

342 elif isinstance(pubkey, EdDSAPublicKey): 

343 obj.__class__ = PubKeyEdDSA 

344 obj.import_from_der(obj._der) 

345 else: 

346 raise 

347 obj.marker = "PUBLIC KEY" 

348 except Exception: 

349 try: 

350 pubkey = RSAPublicKey(obj._der) 

351 obj.__class__ = PubKeyRSA 

352 obj.import_from_asn1pkt(pubkey) 

353 obj.marker = "RSA PUBLIC KEY" 

354 except Exception: 

355 # We cannot import an ECDSA public key without curve knowledge 

356 if conf.debug_dissector: 

357 raise 

358 raise Exception("Unable to import public key") 

359 return obj 

360 

361 

362class PubKey(metaclass=_PubKeyFactory): 

363 """ 

364 Parent class for PubKeyRSA, PubKeyECDSA and PubKeyEdDSA. 

365 Provides common verifyCert() and export() methods. 

366 """ 

367 

368 def verifyCert(self, cert): 

369 """Verifies either a Cert or an X509_Cert.""" 

370 h = _get_cert_sig_hashname(cert) 

371 tbsCert = cert.tbsCertificate 

372 sigVal = bytes(cert.signatureValue) 

373 return self.verify(bytes(tbsCert), sigVal, h=h, t="pkcs") 

374 

375 def verifyCsr(self, csr): 

376 """Verifies a CSR.""" 

377 h = _get_csr_sig_hashname(csr) 

378 certReqInfo = csr.certReq.certificationRequestInfo 

379 sigVal = bytes(csr.certReq.signature) 

380 return self.verify(bytes(certReqInfo), sigVal, h=h, t="pkcs") 

381 

382 @property 

383 def pem(self): 

384 return der2pem(self.der, self.marker) 

385 

386 @property 

387 def der(self): 

388 return self.pubkey.public_bytes( 

389 encoding=serialization.Encoding.DER, 

390 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

391 ) 

392 

393 def public_numbers(self, *args, **kwargs): 

394 return self.pubkey.public_numbers(*args, **kwargs) 

395 

396 @property 

397 def key_size(self): 

398 return self.pubkey.key_size 

399 

400 def export(self, filename, fmt=None): 

401 """ 

402 Export public key in 'fmt' format (DER or PEM) to file 'filename' 

403 """ 

404 if fmt is None: 

405 if filename.endswith(".pem"): 

406 fmt = "PEM" 

407 else: 

408 fmt = "DER" 

409 with open(filename, "wb") as f: 

410 if fmt == "DER": 

411 return f.write(self.der) 

412 elif fmt == "PEM": 

413 return f.write(self.pem.encode()) 

414 

415 @crypto_validator 

416 def verify(self, msg, sig, h="sha256", **kwargs): 

417 """ 

418 Verify signed data. 

419 """ 

420 raise NotImplementedError 

421 

422 

423class PubKeyRSA(PubKey, _EncryptAndVerifyRSA): 

424 """ 

425 Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py 

426 Use the 'key' attribute to access original object. 

427 """ 

428 

429 @crypto_validator 

430 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None): 

431 pubExp = pubExp or 65537 

432 if not modulus: 

433 real_modulusLen = modulusLen or 2048 

434 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED: 

435 # cryptography > 43.0 compatibility 

436 private_key = rust_openssl.rsa.generate_private_key( 

437 public_exponent=pubExp, 

438 key_size=real_modulusLen, 

439 ) 

440 else: 

441 private_key = rsa.generate_private_key( 

442 public_exponent=pubExp, 

443 key_size=real_modulusLen, 

444 backend=default_backend(), 

445 ) 

446 self.pubkey = private_key.public_key() 

447 else: 

448 real_modulusLen = len(binrepr(modulus)) 

449 if modulusLen and real_modulusLen != modulusLen: 

450 warning("modulus and modulusLen do not match!") 

451 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 

452 self.pubkey = pubNum.public_key(default_backend()) 

453 

454 self.marker = "PUBLIC KEY" 

455 

456 # Lines below are only useful for the legacy part of pkcs1.py 

457 pubNum = self.pubkey.public_numbers() 

458 self._modulusLen = real_modulusLen 

459 self._modulus = pubNum.n 

460 self._pubExp = pubNum.e 

461 

462 @crypto_validator 

463 def import_from_tuple(self, tup): 

464 # this is rarely used 

465 e, m, mLen = tup 

466 if isinstance(m, bytes): 

467 m = pkcs_os2ip(m) 

468 if isinstance(e, bytes): 

469 e = pkcs_os2ip(e) 

470 self.fill_and_store(modulus=m, pubExp=e) 

471 

472 def import_from_asn1pkt(self, pubkey): 

473 modulus = pubkey.modulus.val 

474 pubExp = pubkey.publicExponent.val 

475 self.fill_and_store(modulus=modulus, pubExp=pubExp) 

476 

477 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 

478 # no ECDSA encryption support, hence no ECDSA specific keywords here 

479 return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L) 

480 

481 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

482 return _EncryptAndVerifyRSA.verify(self, msg, sig, t=t, h=h, mgf=mgf, L=L) 

483 

484 

485class PubKeyECDSA(PubKey): 

486 """ 

487 Wrapper for ECDSA keys based on the cryptography library. 

488 Use the 'key' attribute to access original object. 

489 """ 

490 

491 @crypto_validator 

492 def fill_and_store(self, curve=None): 

493 curve = curve or ec.SECP256R1 

494 private_key = ec.generate_private_key(curve(), default_backend()) 

495 self.pubkey = private_key.public_key() 

496 

497 @crypto_validator 

498 def import_from_der(self, pubkey): 

499 # No lib support for explicit curves nor compressed points. 

500 self.pubkey = serialization.load_der_public_key( 

501 pubkey, 

502 backend=default_backend(), 

503 ) 

504 

505 def encrypt(self, msg, h="sha256", **kwargs): 

506 raise Exception("No ECDSA encryption support") 

507 

508 @crypto_validator 

509 def verify(self, msg, sig, h="sha256", **kwargs): 

510 # 'sig' should be a DER-encoded signature, as per RFC 3279 

511 try: 

512 self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h))) 

513 return True 

514 except InvalidSignature: 

515 return False 

516 

517 

518class PubKeyEdDSA(PubKey): 

519 """ 

520 Wrapper for EdDSA keys based on the cryptography library. 

521 Use the 'key' attribute to access original object. 

522 """ 

523 

524 @crypto_validator 

525 def fill_and_store(self, curve=None): 

526 curve = curve or x25519.X25519PrivateKey 

527 private_key = curve.generate() 

528 self.pubkey = private_key.public_key() 

529 

530 @crypto_validator 

531 def import_from_der(self, pubkey): 

532 self.pubkey = serialization.load_der_public_key( 

533 pubkey, 

534 backend=default_backend(), 

535 ) 

536 

537 def encrypt(self, msg, **kwargs): 

538 raise Exception("No EdDSA encryption support") 

539 

540 @crypto_validator 

541 def verify(self, msg, sig, **kwargs): 

542 # 'sig' should be a DER-encoded signature, as per RFC 3279 

543 try: 

544 self.pubkey.verify(sig, msg) 

545 return True 

546 except InvalidSignature: 

547 return False 

548 

549 

550################ 

551# Private Keys # 

552################ 

553 

554 

555class _PrivKeyFactory(_PKIObjMaker): 

556 """ 

557 Metaclass for PrivKey creation. 

558 It casts the appropriate class on the fly, then fills in 

559 the appropriate attributes with import_from_asn1pkt() submethod. 

560 """ 

561 

562 def __call__(cls, key_path=None, cryptography_obj=None): 

563 """ 

564 key_path may be the path to either: 

565 _an RSAPrivateKey_OpenSSL (as generated by openssl); 

566 _an ECDSAPrivateKey_OpenSSL (as generated by openssl); 

567 _an RSAPrivateKey; 

568 _an ECDSAPrivateKey. 

569 """ 

570 if key_path is None: 

571 obj = type.__call__(cls) 

572 if cls is PrivKey: 

573 cls = PrivKeyECDSA 

574 obj.__class__ = cls 

575 obj.frmt = "original" 

576 obj.fill_and_store() 

577 return obj 

578 

579 # This allows to import cryptography objects directly 

580 if cryptography_obj is not None: 

581 # We (stupidly) need to go through the whole import process because RSA 

582 # does more than just importing the cryptography objects... 

583 obj = _PKIObj( 

584 "DER", 

585 cryptography_obj.private_bytes( 

586 encoding=serialization.Encoding.DER, 

587 format=serialization.PrivateFormat.PKCS8, 

588 encryption_algorithm=serialization.NoEncryption(), 

589 ), 

590 ) 

591 else: 

592 # Load from file 

593 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 

594 

595 try: 

596 privkey = RSAPrivateKey_OpenSSL(obj._der) 

597 privkey = privkey.privateKey 

598 obj.__class__ = PrivKeyRSA 

599 obj.marker = "PRIVATE KEY" 

600 except Exception: 

601 try: 

602 privkey = ECDSAPrivateKey_OpenSSL(obj._der) 

603 privkey = privkey.privateKey 

604 obj.__class__ = PrivKeyECDSA 

605 obj.marker = "EC PRIVATE KEY" 

606 except Exception: 

607 try: 

608 privkey = RSAPrivateKey(obj._der) 

609 obj.__class__ = PrivKeyRSA 

610 obj.marker = "RSA PRIVATE KEY" 

611 except Exception: 

612 try: 

613 privkey = ECDSAPrivateKey(obj._der) 

614 obj.__class__ = PrivKeyECDSA 

615 obj.marker = "EC PRIVATE KEY" 

616 except Exception: 

617 try: 

618 privkey = EdDSAPrivateKey(obj._der) 

619 obj.__class__ = PrivKeyEdDSA 

620 obj.marker = "PRIVATE KEY" 

621 except Exception: 

622 raise Exception("Unable to import private key") 

623 try: 

624 obj.import_from_asn1pkt(privkey) 

625 except ImportError: 

626 pass 

627 return obj 

628 

629 

630class _Raw_ASN1_BIT_STRING(ASN1_BIT_STRING): 

631 """A ASN1_BIT_STRING that ignores BER encoding""" 

632 

633 def __bytes__(self): 

634 return self.val_readable 

635 

636 __str__ = __bytes__ 

637 

638 

639class PrivKey(metaclass=_PrivKeyFactory): 

640 """ 

641 Parent class for PrivKeyRSA, PrivKeyECDSA and PrivKeyEdDSA. 

642 Provides common signTBSCert(), resignCert(), verifyCert() 

643 and export() methods. 

644 """ 

645 

646 def signTBSCert(self, tbsCert, h="sha256"): 

647 """ 

648 Note that this will always copy the signature field from the 

649 tbsCertificate into the signatureAlgorithm field of the result, 

650 regardless of the coherence between its contents (which might 

651 indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2). 

652 

653 There is a small inheritance trick for the computation of sigVal 

654 below: in order to use a sign() method which would apply 

655 to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the 

656 subclasses accept any argument, be it from the RSA or ECDSA world, 

657 and then they keep the ones they're interested in. 

658 Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign(). 

659 """ 

660 sigAlg = tbsCert.signature 

661 h = h or hash_by_oid[sigAlg.algorithm.val] 

662 sigVal = self.sign(bytes(tbsCert), h=h, t="pkcs") 

663 c = X509_Cert() 

664 c.tbsCertificate = tbsCert 

665 c.signatureAlgorithm = sigAlg 

666 c.signatureValue = _Raw_ASN1_BIT_STRING(sigVal, readable=True) 

667 return c 

668 

669 def resignCert(self, cert): 

670 """Rewrite the signature of either a Cert or an X509_Cert.""" 

671 return self.signTBSCert(cert.tbsCertificate, h=None) 

672 

673 def verifyCert(self, cert): 

674 """Verifies either a Cert or an X509_Cert.""" 

675 return self.pubkey.verifyCert(cert) 

676 

677 def verifyCsr(self, cert): 

678 """Verifies either a CSR.""" 

679 return self.pubkey.verifyCsr(cert) 

680 

681 @property 

682 def pem(self): 

683 return der2pem(self.der, self.marker) 

684 

685 @property 

686 def der(self): 

687 return self.key.private_bytes( 

688 encoding=serialization.Encoding.DER, 

689 format=serialization.PrivateFormat.PKCS8, 

690 encryption_algorithm=serialization.NoEncryption(), 

691 ) 

692 

693 def export(self, filename, fmt=None): 

694 """ 

695 Export private key in 'fmt' format (DER or PEM) to file 'filename' 

696 """ 

697 if fmt is None: 

698 if filename.endswith(".pem"): 

699 fmt = "PEM" 

700 else: 

701 fmt = "DER" 

702 with open(filename, "wb") as f: 

703 if fmt == "DER": 

704 return f.write(self.der) 

705 elif fmt == "PEM": 

706 return f.write(self.pem.encode()) 

707 

708 @crypto_validator 

709 def sign(self, data, h="sha256", **kwargs): 

710 """ 

711 Sign data. 

712 """ 

713 raise NotImplementedError 

714 

715 @crypto_validator 

716 def verify(self, msg, sig, h="sha256", **kwargs): 

717 """ 

718 Verify signed data. 

719 """ 

720 raise NotImplementedError 

721 

722 

723class PrivKeyRSA(PrivKey, _DecryptAndSignRSA): 

724 """ 

725 Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py 

726 Use the 'key' attribute to access original object. 

727 """ 

728 

729 @crypto_validator 

730 def fill_and_store( 

731 self, 

732 modulus=None, 

733 modulusLen=None, 

734 pubExp=None, 

735 prime1=None, 

736 prime2=None, 

737 coefficient=None, 

738 exponent1=None, 

739 exponent2=None, 

740 privExp=None, 

741 ): 

742 pubExp = pubExp or 65537 

743 if None in [ 

744 modulus, 

745 prime1, 

746 prime2, 

747 coefficient, 

748 privExp, 

749 exponent1, 

750 exponent2, 

751 ]: 

752 # note that the library requires every parameter 

753 # in order to call RSAPrivateNumbers(...) 

754 # if one of these is missing, we generate a whole new key 

755 real_modulusLen = modulusLen or 2048 

756 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED: 

757 # cryptography > 43.0 compatibility 

758 self.key = rust_openssl.rsa.generate_private_key( 

759 public_exponent=pubExp, 

760 key_size=real_modulusLen, 

761 ) 

762 else: 

763 self.key = rsa.generate_private_key( 

764 public_exponent=pubExp, 

765 key_size=real_modulusLen, 

766 backend=default_backend(), 

767 ) 

768 pubkey = self.key.public_key() 

769 else: 

770 real_modulusLen = len(binrepr(modulus)) 

771 if modulusLen and real_modulusLen != modulusLen: 

772 warning("modulus and modulusLen do not match!") 

773 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 

774 privNum = rsa.RSAPrivateNumbers( 

775 p=prime1, 

776 q=prime2, 

777 dmp1=exponent1, 

778 dmq1=exponent2, 

779 iqmp=coefficient, 

780 d=privExp, 

781 public_numbers=pubNum, 

782 ) 

783 self.key = privNum.private_key(default_backend()) 

784 pubkey = self.key.public_key() 

785 

786 self.marker = "PRIVATE KEY" 

787 

788 # Lines below are only useful for the legacy part of pkcs1.py 

789 pubNum = pubkey.public_numbers() 

790 self._modulusLen = real_modulusLen 

791 self._modulus = pubNum.n 

792 self._pubExp = pubNum.e 

793 

794 self.pubkey = PubKeyRSA((pubNum.e, pubNum.n, real_modulusLen)) 

795 

796 def import_from_asn1pkt(self, privkey): 

797 modulus = privkey.modulus.val 

798 pubExp = privkey.publicExponent.val 

799 privExp = privkey.privateExponent.val 

800 prime1 = privkey.prime1.val 

801 prime2 = privkey.prime2.val 

802 exponent1 = privkey.exponent1.val 

803 exponent2 = privkey.exponent2.val 

804 coefficient = privkey.coefficient.val 

805 self.fill_and_store( 

806 modulus=modulus, 

807 pubExp=pubExp, 

808 privExp=privExp, 

809 prime1=prime1, 

810 prime2=prime2, 

811 exponent1=exponent1, 

812 exponent2=exponent2, 

813 coefficient=coefficient, 

814 ) 

815 

816 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

817 return self.pubkey.verify( 

818 msg=msg, 

819 sig=sig, 

820 t=t, 

821 h=h, 

822 mgf=mgf, 

823 L=L, 

824 ) 

825 

826 def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None): 

827 return _DecryptAndSignRSA.sign(self, data, t=t, h=h, mgf=mgf, L=L) 

828 

829 

830class PrivKeyECDSA(PrivKey): 

831 """ 

832 Wrapper for ECDSA keys based on SigningKey from ecdsa library. 

833 Use the 'key' attribute to access original object. 

834 """ 

835 

836 @crypto_validator 

837 def fill_and_store(self, curve=None): 

838 curve = curve or ec.SECP256R1 

839 self.key = ec.generate_private_key(curve(), default_backend()) 

840 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

841 self.marker = "EC PRIVATE KEY" 

842 

843 @crypto_validator 

844 def import_from_asn1pkt(self, privkey): 

845 self.key = serialization.load_der_private_key( 

846 bytes(privkey), None, backend=default_backend() 

847 ) 

848 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

849 self.marker = "EC PRIVATE KEY" 

850 

851 @crypto_validator 

852 def verify(self, msg, sig, h="sha256", **kwargs): 

853 return self.pubkey.verify(msg=msg, sig=sig, h=h, **kwargs) 

854 

855 @crypto_validator 

856 def sign(self, data, h="sha256", **kwargs): 

857 return self.key.sign(data, ec.ECDSA(_get_hash(h))) 

858 

859 

860class PrivKeyEdDSA(PrivKey): 

861 """ 

862 Wrapper for EdDSA keys 

863 Use the 'key' attribute to access original object. 

864 """ 

865 

866 @crypto_validator 

867 def fill_and_store(self, curve=None): 

868 curve = curve or x25519.X25519PrivateKey 

869 self.key = curve.generate() 

870 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

871 self.marker = "PRIVATE KEY" 

872 

873 @crypto_validator 

874 def import_from_asn1pkt(self, privkey): 

875 self.key = serialization.load_der_private_key( 

876 bytes(privkey), None, backend=default_backend() 

877 ) 

878 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

879 self.marker = "PRIVATE KEY" 

880 

881 @crypto_validator 

882 def verify(self, msg, sig, **kwargs): 

883 return self.pubkey.verify(msg=msg, sig=sig, **kwargs) 

884 

885 @crypto_validator 

886 def sign(self, data, **kwargs): 

887 return self.key.sign(data) 

888 

889 

890################ 

891# Certificates # 

892################ 

893 

894 

895class _CertMaker(_PKIObjMaker): 

896 """ 

897 Metaclass for Cert creation. It is not necessary as it was for the keys, 

898 but we reuse the model instead of creating redundant constructors. 

899 """ 

900 

901 def __call__(cls, cert_path=None, cryptography_obj=None): 

902 # This allows to import cryptography objects directly 

903 if cryptography_obj is not None: 

904 obj = _PKIObj( 

905 "DER", 

906 cryptography_obj.public_bytes( 

907 encoding=serialization.Encoding.DER, 

908 ), 

909 ) 

910 else: 

911 # Load from file 

912 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CERT_SIZE, "CERTIFICATE") 

913 obj.__class__ = Cert 

914 obj.marker = "CERTIFICATE" 

915 try: 

916 cert = X509_Cert(obj._der) 

917 except Exception: 

918 if conf.debug_dissector: 

919 raise 

920 raise Exception("Unable to import certificate") 

921 obj.import_from_asn1pkt(cert) 

922 return obj 

923 

924 

925def _get_cert_sig_hashname(cert): 

926 """ 

927 Return the hash associated with the signature algorithm of a certificate. 

928 """ 

929 tbsCert = cert.tbsCertificate 

930 sigAlg = tbsCert.signature 

931 return hash_by_oid[sigAlg.algorithm.val] 

932 

933 

934def _get_csr_sig_hashname(csr): 

935 """ 

936 Return the hash associated with the signature algorithm of a CSR. 

937 """ 

938 certReq = csr.certReq 

939 sigAlg = certReq.signatureAlgorithm 

940 return hash_by_oid[sigAlg.algorithm.val] 

941 

942 

943class Cert(metaclass=_CertMaker): 

944 """ 

945 Wrapper for the X509_Cert from layers/x509.py. 

946 Use the 'x509Cert' attribute to access original object. 

947 """ 

948 

949 def import_from_asn1pkt(self, cert): 

950 error_msg = "Unable to import certificate" 

951 

952 self.x509Cert = cert 

953 

954 tbsCert = cert.tbsCertificate 

955 

956 if tbsCert.version: 

957 self.version = tbsCert.version.val + 1 

958 else: 

959 self.version = 1 

960 self.serial = tbsCert.serialNumber.val 

961 self.sigAlg = tbsCert.signature.algorithm.oidname 

962 self.issuer = tbsCert.get_issuer() 

963 self.issuer_str = tbsCert.get_issuer_str() 

964 self.issuer_hash = hash(self.issuer_str) 

965 self.subject = tbsCert.get_subject() 

966 self.subject_str = tbsCert.get_subject_str() 

967 self.subject_hash = hash(self.subject_str) 

968 self.authorityKeyID = None 

969 

970 self.notBefore_str = tbsCert.validity.not_before.pretty_time 

971 try: 

972 self.notBefore = tbsCert.validity.not_before.datetime.timetuple() 

973 except ValueError: 

974 raise Exception(error_msg) 

975 self.notBefore_str_simple = time.strftime("%x", self.notBefore) 

976 

977 self.notAfter_str = tbsCert.validity.not_after.pretty_time 

978 try: 

979 self.notAfter = tbsCert.validity.not_after.datetime.timetuple() 

980 except ValueError: 

981 raise Exception(error_msg) 

982 self.notAfter_str_simple = time.strftime("%x", self.notAfter) 

983 

984 self.pubkey = PubKey(bytes(tbsCert.subjectPublicKeyInfo)) 

985 

986 if tbsCert.extensions: 

987 for extn in tbsCert.extensions: 

988 if extn.extnID.oidname == "basicConstraints": 

989 self.cA = False 

990 if extn.extnValue.cA: 

991 self.cA = not (extn.extnValue.cA.val == 0) 

992 elif extn.extnID.oidname == "keyUsage": 

993 self.keyUsage = extn.extnValue.get_keyUsage() 

994 elif extn.extnID.oidname == "extKeyUsage": 

995 self.extKeyUsage = extn.extnValue.get_extendedKeyUsage() 

996 elif extn.extnID.oidname == "authorityKeyIdentifier": 

997 self.authorityKeyID = extn.extnValue.keyIdentifier.val 

998 

999 self.signatureValue = bytes(cert.signatureValue) 

1000 self.signatureLen = len(self.signatureValue) 

1001 

1002 def isIssuer(self, other): 

1003 """ 

1004 True if 'other' issued 'self', i.e.: 

1005 - self.issuer == other.subject 

1006 - self is signed by other 

1007 """ 

1008 if self.issuer_hash != other.subject_hash: 

1009 return False 

1010 return other.pubkey.verifyCert(self) 

1011 

1012 def isIssuerCert(self, other): 

1013 return self.isIssuer(other) 

1014 

1015 def isSelfSigned(self): 

1016 """ 

1017 Return True if the certificate is self-signed: 

1018 - issuer and subject are the same 

1019 - the signature of the certificate is valid. 

1020 """ 

1021 if self.issuer_hash == self.subject_hash: 

1022 return self.isIssuer(self) 

1023 return False 

1024 

1025 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 

1026 # no ECDSA *encryption* support, hence only RSA specific keywords here 

1027 return self.pubkey.encrypt(msg, t=t, h=h, mgf=mgf, L=L) 

1028 

1029 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

1030 return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L) 

1031 

1032 def getSignatureHash(self): 

1033 """ 

1034 Return the hash cryptography object used by the 'signatureAlgorithm' 

1035 """ 

1036 return _get_hash(_get_cert_sig_hashname(self)) 

1037 

1038 def setSubjectPublicKeyFromPrivateKey(self, key): 

1039 """ 

1040 Replace the subjectPublicKeyInfo of this certificate with the one from 

1041 the provided key. 

1042 """ 

1043 if isinstance(key, (PubKey, PrivKey)): 

1044 if isinstance(key, PrivKey): 

1045 pubkey = key.pubkey 

1046 else: 

1047 pubkey = key 

1048 self.tbsCertificate.subjectPublicKeyInfo = X509_SubjectPublicKeyInfo( 

1049 pubkey.der 

1050 ) 

1051 else: 

1052 raise ValueError("Unknown type 'key', should be PubKey or PrivKey") 

1053 

1054 def resignWith(self, key): 

1055 """ 

1056 Resign a certificate with a specific key 

1057 """ 

1058 self.import_from_asn1pkt(key.resignCert(self)) 

1059 

1060 def remainingDays(self, now=None): 

1061 """ 

1062 Based on the value of notAfter field, returns the number of 

1063 days the certificate will still be valid. The date used for the 

1064 comparison is the current and local date, as returned by 

1065 time.localtime(), except if 'now' argument is provided another 

1066 one. 'now' argument can be given as either a time tuple or a string 

1067 representing the date. Accepted format for the string version 

1068 are: 

1069 

1070 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT' 

1071 - '%m/%d/%y' e.g. '01/30/08' (less precise) 

1072 

1073 If the certificate is no more valid at the date considered, then 

1074 a negative value is returned representing the number of days 

1075 since it has expired. 

1076 

1077 The number of days is returned as a float to deal with the unlikely 

1078 case of certificates that are still just valid. 

1079 """ 

1080 if now is None: 

1081 now = time.localtime() 

1082 elif isinstance(now, str): 

1083 try: 

1084 if "/" in now: 

1085 now = time.strptime(now, "%m/%d/%y") 

1086 else: 

1087 now = time.strptime(now, "%b %d %H:%M:%S %Y %Z") 

1088 except Exception: 

1089 warning("Bad time string provided, will use localtime() instead.") 

1090 now = time.localtime() 

1091 

1092 now = time.mktime(now) 

1093 nft = time.mktime(self.notAfter) 

1094 diff = (nft - now) / (24.0 * 3600) 

1095 return diff 

1096 

1097 def isRevoked(self, crl_list): 

1098 """ 

1099 Given a list of trusted CRL (their signature has already been 

1100 verified with trusted anchors), this function returns True if 

1101 the certificate is marked as revoked by one of those CRL. 

1102 

1103 Note that if the Certificate was on hold in a previous CRL and 

1104 is now valid again in a new CRL and bot are in the list, it 

1105 will be considered revoked: this is because _all_ CRLs are 

1106 checked (not only the freshest) and revocation status is not 

1107 handled. 

1108 

1109 Also note that the check on the issuer is performed on the 

1110 Authority Key Identifier if available in _both_ the CRL and the 

1111 Cert. Otherwise, the issuers are simply compared. 

1112 """ 

1113 for c in crl_list: 

1114 if ( 

1115 self.authorityKeyID is not None 

1116 and c.authorityKeyID is not None 

1117 and self.authorityKeyID == c.authorityKeyID 

1118 ): 

1119 return self.serial in (x[0] for x in c.revoked_cert_serials) 

1120 elif self.issuer == c.issuer: 

1121 return self.serial in (x[0] for x in c.revoked_cert_serials) 

1122 return False 

1123 

1124 @property 

1125 def tbsCertificate(self): 

1126 return self.x509Cert.tbsCertificate 

1127 

1128 @property 

1129 def pem(self): 

1130 return der2pem(self.der, self.marker) 

1131 

1132 @property 

1133 def der(self): 

1134 return bytes(self.x509Cert) 

1135 

1136 @property 

1137 def pubKey(self): 

1138 warnings.warn( 

1139 "Cert.pubKey is deprecated and will be removed in a future version. " 

1140 "Use Cert.pubkey", 

1141 DeprecationWarning, 

1142 ) 

1143 return self.pubkey 

1144 

1145 def __eq__(self, other): 

1146 return self.der == other.der 

1147 

1148 def __hash__(self): 

1149 return hash(self.der) 

1150 

1151 def export(self, filename, fmt=None): 

1152 """ 

1153 Export certificate in 'fmt' format (DER or PEM) to file 'filename' 

1154 """ 

1155 if fmt is None: 

1156 if filename.endswith(".pem"): 

1157 fmt = "PEM" 

1158 else: 

1159 fmt = "DER" 

1160 with open(filename, "wb") as f: 

1161 if fmt == "DER": 

1162 return f.write(self.der) 

1163 elif fmt == "PEM": 

1164 return f.write(self.pem.encode()) 

1165 

1166 def show(self): 

1167 print("Serial: %s" % self.serial) 

1168 print("Issuer: " + self.issuer_str) 

1169 print("Subject: " + self.subject_str) 

1170 print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str)) 

1171 

1172 def __repr__(self): 

1173 return "[X.509 Cert. Subject:%s, Issuer:%s]" % ( 

1174 self.subject_str, 

1175 self.issuer_str, 

1176 ) 

1177 

1178 

1179################################ 

1180# Certificate Revocation Lists # 

1181################################ 

1182 

1183 

1184class _CRLMaker(_PKIObjMaker): 

1185 """ 

1186 Metaclass for CRL creation. It is not necessary as it was for the keys, 

1187 but we reuse the model instead of creating redundant constructors. 

1188 """ 

1189 

1190 def __call__(cls, cert_path): 

1191 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL") 

1192 obj.__class__ = CRL 

1193 try: 

1194 crl = X509_CRL(obj._der) 

1195 except Exception: 

1196 raise Exception("Unable to import CRL") 

1197 obj.import_from_asn1pkt(crl) 

1198 return obj 

1199 

1200 

1201class CRL(metaclass=_CRLMaker): 

1202 """ 

1203 Wrapper for the X509_CRL from layers/x509.py. 

1204 Use the 'x509CRL' attribute to access original object. 

1205 """ 

1206 

1207 def import_from_asn1pkt(self, crl): 

1208 error_msg = "Unable to import CRL" 

1209 

1210 self.x509CRL = crl 

1211 

1212 tbsCertList = crl.tbsCertList 

1213 self.tbsCertList = bytes(tbsCertList) 

1214 

1215 if tbsCertList.version: 

1216 self.version = tbsCertList.version.val + 1 

1217 else: 

1218 self.version = 1 

1219 self.sigAlg = tbsCertList.signature.algorithm.oidname 

1220 self.issuer = tbsCertList.get_issuer() 

1221 self.issuer_str = tbsCertList.get_issuer_str() 

1222 self.issuer_hash = hash(self.issuer_str) 

1223 

1224 self.lastUpdate_str = tbsCertList.this_update.pretty_time 

1225 lastUpdate = tbsCertList.this_update.val 

1226 if lastUpdate[-1] == "Z": 

1227 lastUpdate = lastUpdate[:-1] 

1228 try: 

1229 self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S") 

1230 except Exception: 

1231 raise Exception(error_msg) 

1232 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate) 

1233 

1234 self.nextUpdate = None 

1235 self.nextUpdate_str_simple = None 

1236 if tbsCertList.next_update: 

1237 self.nextUpdate_str = tbsCertList.next_update.pretty_time 

1238 nextUpdate = tbsCertList.next_update.val 

1239 if nextUpdate[-1] == "Z": 

1240 nextUpdate = nextUpdate[:-1] 

1241 try: 

1242 self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S") 

1243 except Exception: 

1244 raise Exception(error_msg) 

1245 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate) 

1246 

1247 if tbsCertList.crlExtensions: 

1248 for extension in tbsCertList.crlExtensions: 

1249 if extension.extnID.oidname == "cRLNumber": 

1250 self.number = extension.extnValue.cRLNumber.val 

1251 

1252 revoked = [] 

1253 if tbsCertList.revokedCertificates: 

1254 for cert in tbsCertList.revokedCertificates: 

1255 serial = cert.serialNumber.val 

1256 date = cert.revocationDate.val 

1257 if date[-1] == "Z": 

1258 date = date[:-1] 

1259 try: 

1260 time.strptime(date, "%y%m%d%H%M%S") 

1261 except Exception: 

1262 raise Exception(error_msg) 

1263 revoked.append((serial, date)) 

1264 self.revoked_cert_serials = revoked 

1265 

1266 self.signatureValue = bytes(crl.signatureValue) 

1267 self.signatureLen = len(self.signatureValue) 

1268 

1269 def isIssuer(self, other): 

1270 # This is exactly the same thing as in Cert method. 

1271 if self.issuer_hash != other.subject_hash: 

1272 return False 

1273 return other.pubkey.verifyCert(self) 

1274 

1275 def verify(self, anchors): 

1276 # Return True iff the CRL is signed by one of the provided anchors. 

1277 return any(self.isIssuer(a) for a in anchors) 

1278 

1279 def show(self): 

1280 print("Version: %d" % self.version) 

1281 print("sigAlg: " + self.sigAlg) 

1282 print("Issuer: " + self.issuer_str) 

1283 print("lastUpdate: %s" % self.lastUpdate_str) 

1284 print("nextUpdate: %s" % self.nextUpdate_str) 

1285 

1286 

1287############################### 

1288# Certificate Signing Request # 

1289############################### 

1290 

1291 

1292class _CSRMaker(_PKIObjMaker): 

1293 """ 

1294 Metaclass for CSR creation. It is not necessary as it was for the keys, 

1295 but we reuse the model instead of creating redundant constructors. 

1296 """ 

1297 

1298 def __call__(cls, cert_path): 

1299 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CSR_SIZE) 

1300 obj.__class__ = CSR 

1301 try: 

1302 # PKCS#10 format 

1303 csr = PKCS10_CertificationRequest(obj._der) 

1304 obj.marker = "NEW CERTIFICATE REQUEST" 

1305 obj.fmt = CSR.FORMAT.PKCS10 

1306 except Exception: 

1307 try: 

1308 # CMC format 

1309 csr = CMS_ContentInfo(obj._der) 

1310 obj.marker = "NEW CERTIFICATE REQUEST" 

1311 obj.fmt = CSR.FORMAT.CMC 

1312 except Exception: 

1313 raise Exception("Unable to import CSR") 

1314 

1315 obj.import_from_asn1pkt(csr) 

1316 return obj 

1317 

1318 

1319class CSR(metaclass=_CSRMaker): 

1320 """ 

1321 Wrapper for the CSR formats. 

1322 This can handle both PKCS#10 and CMC formats. 

1323 """ 

1324 

1325 class FORMAT(enum.Enum): 

1326 """ 

1327 The format used by the CSR. 

1328 """ 

1329 

1330 PKCS10 = "PKCS#10" 

1331 CMC = "CMC" 

1332 

1333 def import_from_asn1pkt(self, csr): 

1334 self.csr = csr 

1335 certReqInfo = self.certReq.certificationRequestInfo 

1336 

1337 # Subject 

1338 self.subject = certReqInfo.get_subject() 

1339 self.subject_str = certReqInfo.get_subject_str() 

1340 self.subject_hash = hash(self.subject_str) 

1341 

1342 # pubkey 

1343 self.pubkey = PubKey(bytes(certReqInfo.subjectPublicKeyInfo)) 

1344 

1345 # Get the "subjectKeyIdentifier" from the "extensionRequest" attribute 

1346 try: 

1347 extReq = next( 

1348 x.values[0].value 

1349 for x in certReqInfo.attributes 

1350 if x.type.val == "1.2.840.113549.1.9.14" # extKeyUsage 

1351 ) 

1352 self.sid = next( 

1353 x.extnValue.keyIdentifier 

1354 for x in extReq.extensions 

1355 if x.extnID.val == "2.5.29.14" # subjectKeyIdentifier 

1356 ) 

1357 except StopIteration: 

1358 self.sid = None 

1359 

1360 @property 

1361 def certReq(self): 

1362 csr = self.csr 

1363 

1364 if self.fmt == CSR.FORMAT.PKCS10: 

1365 return csr 

1366 elif self.fmt == CSR.FORMAT.CMC: 

1367 if ( 

1368 csr.contentType.oidname != "id-signedData" 

1369 or csr.content.encapContentInfo.eContentType.oidname != "id-cct-PKIData" 

1370 ): 

1371 raise ValueError("Invalid CMC wrapping !") 

1372 req = csr.content.encapContentInfo.eContent.reqSequence[0] 

1373 return req.request.certificationRequest 

1374 else: 

1375 raise ValueError("Invalid CSR format !") 

1376 

1377 @property 

1378 def pem(self): 

1379 return der2pem(self.der, self.marker) 

1380 

1381 @property 

1382 def der(self): 

1383 return bytes(self.csr) 

1384 

1385 def __eq__(self, other): 

1386 return self.der == other.der 

1387 

1388 def __hash__(self): 

1389 return hash(self.der) 

1390 

1391 def isIssuer(self, other): 

1392 return other.sid == self.sid 

1393 

1394 def isSelfSigned(self): 

1395 return True 

1396 

1397 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

1398 return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L) 

1399 

1400 def export(self, filename, fmt=None): 

1401 """ 

1402 Export certificate in 'fmt' format (DER or PEM) to file 'filename' 

1403 """ 

1404 if fmt is None: 

1405 if filename.endswith(".pem"): 

1406 fmt = "PEM" 

1407 else: 

1408 fmt = "DER" 

1409 with open(filename, "wb") as f: 

1410 if fmt == "DER": 

1411 return f.write(self.der) 

1412 elif fmt == "PEM": 

1413 return f.write(self.pem.encode()) 

1414 

1415 def show(self): 

1416 certReqInfo = self.certReq.certificationRequestInfo 

1417 

1418 print("Subject: " + self.subject_str) 

1419 print("Attributes:") 

1420 for attr in certReqInfo.attributes: 

1421 print(" - %s" % attr.type.oidname) 

1422 

1423 def verifySelf(self) -> bool: 

1424 """ 

1425 Verify the signatures of the CSR 

1426 """ 

1427 if self.fmt == self.FORMAT.CMC: 

1428 try: 

1429 cms_engine = CMS_Engine([self]) 

1430 cms_engine.verify(self.csr) 

1431 return self.pubkey.verifyCsr(self) 

1432 except ValueError: 

1433 return False 

1434 elif self.fmt == self.FORMAT.PKCS10: 

1435 return self.pubkey.verifyCsr(self) 

1436 else: 

1437 return False 

1438 

1439 def __repr__(self): 

1440 return "[CSR Format: %s, Subject:%s, Verified: %s]" % ( 

1441 self.fmt.value, 

1442 self.subject_str, 

1443 self.verifySelf(), 

1444 ) 

1445 

1446 

1447#################### 

1448# Certificate list # 

1449#################### 

1450 

1451 

1452class CertList(list): 

1453 """ 

1454 An object that can store a list of Cert objects, load them and export them 

1455 into DER/PEM format. 

1456 """ 

1457 

1458 def __init__( 

1459 self, 

1460 certList: Union[Self, List[Cert], List[CSR], Cert, str], 

1461 ): 

1462 """ 

1463 Construct a list of certificates/CRLs to be used as list of ROOT certificates. 

1464 """ 

1465 # Parse the certificate list / CA 

1466 if isinstance(certList, str): 

1467 # It's a path. First get the _PKIObj 

1468 obj = _PKIObjMaker.__call__( 

1469 CertList, certList, _MAX_CERT_SIZE, "CERTIFICATE" 

1470 ) 

1471 

1472 # Then parse the der until there's nothing left 

1473 certList = [] 

1474 payload = obj._der 

1475 while payload: 

1476 cert = X509_Cert(payload) 

1477 if conf.raw_layer in cert.payload: 

1478 payload = cert.payload.load 

1479 else: 

1480 payload = None 

1481 cert.remove_payload() 

1482 certList.append(Cert(cert)) 

1483 

1484 self.frmt = obj.frmt 

1485 elif isinstance(certList, Cert): 

1486 certList = [certList] 

1487 self.frmt = "PEM" 

1488 else: 

1489 self.frmt = "PEM" 

1490 

1491 super(CertList, self).__init__(certList) 

1492 

1493 def findCertBySid(self, sid): 

1494 """ 

1495 Find a certificate in the list by SubjectIDentifier. 

1496 """ 

1497 for cert in self: 

1498 if isinstance(cert, Cert) and isinstance(sid, CMS_IssuerAndSerialNumber): 

1499 if cert.issuer == sid.get_issuer(): 

1500 return cert 

1501 elif isinstance(cert, CSR) and isinstance(sid, CMS_SubjectKeyIdentifier): 

1502 if cert.sid == sid.sid: 

1503 return cert 

1504 raise KeyError("Certificate not found !") 

1505 

1506 def export(self, filename, fmt=None): 

1507 """ 

1508 Export a list of certificates 'fmt' format (DER or PEM) to file 'filename' 

1509 """ 

1510 if fmt is None: 

1511 if filename.endswith(".pem"): 

1512 fmt = "PEM" 

1513 else: 

1514 fmt = "DER" 

1515 with open(filename, "wb") as f: 

1516 if fmt == "DER": 

1517 return f.write(self.der) 

1518 elif fmt == "PEM": 

1519 return f.write(self.pem.encode()) 

1520 

1521 @property 

1522 def der(self): 

1523 return b"".join(x.der for x in self) 

1524 

1525 @property 

1526 def pem(self): 

1527 return "".join(x.pem for x in self) 

1528 

1529 def __repr__(self): 

1530 return "<CertList %s certificates>" % (len(self),) 

1531 

1532 def show(self): 

1533 for i, c in enumerate(self): 

1534 print(conf.color_theme.id(i, fmt="%04i"), end=" ") 

1535 print(repr(c)) 

1536 

1537 

1538###################### 

1539# Certificate chains # 

1540###################### 

1541 

1542 

1543class CertTree(CertList): 

1544 """ 

1545 An extension to CertList that additionally has a list of ROOT CAs 

1546 that are trusted. 

1547 

1548 Example:: 

1549 

1550 >>> tree = CertTree("ca_chain.pem") 

1551 >>> tree.show() 

1552 /CN=DOMAIN-DC1-CA/dc=DOMAIN [Self Signed] 

1553 /CN=Administrator/dc=DOMAIN [Not Self Signed] 

1554 """ 

1555 

1556 __slots__ = ["frmt", "rootCAs"] 

1557 

1558 def __init__( 

1559 self, 

1560 certList: Union[List[Cert], CertList, str], 

1561 rootCAs: Union[List[Cert], CertList, Cert, str, None] = None, 

1562 ): 

1563 """ 

1564 Construct a chain of certificates that follows issuer/subject matching and 

1565 respects signature validity. 

1566 

1567 Note that we do not check AKID/{SKID/issuer/serial} matching, 

1568 nor the presence of keyCertSign in keyUsage extension (if present). 

1569 

1570 :param certList: a list of Cert/CRL objects (or path to PEM/DER file containing 

1571 multiple certs/CRL) to try to chain. 

1572 :param rootCAs: (optional) a list of certificates to trust. If not provided, 

1573 trusts any self-signed certificates from the certList. 

1574 """ 

1575 # Parse the certificate list 

1576 certList = CertList(certList) 

1577 

1578 # Find the ROOT CAs if store isn't specified 

1579 if not rootCAs: 

1580 # Build cert store. 

1581 self.rootCAs = CertList([x for x in certList if x.isSelfSigned()]) 

1582 # And remove those certs from the list 

1583 for cert in self.rootCAs: 

1584 certList.remove(cert) 

1585 else: 

1586 # Store cert store. 

1587 self.rootCAs = CertList(rootCAs) 

1588 # And remove those certs from the list if present (remove dups) 

1589 for cert in self.rootCAs: 

1590 if cert in certList: 

1591 certList.remove(cert) 

1592 

1593 # Append our root CAs to the certList 

1594 certList.extend(self.rootCAs) 

1595 

1596 # Super instantiate 

1597 super(CertTree, self).__init__(certList) 

1598 

1599 @property 

1600 def tree(self): 

1601 """ 

1602 Get a tree-like object of the certificate list 

1603 """ 

1604 # We store the tree object as a dictionary that contains children. 

1605 tree = [(x, []) for x in self.rootCAs] 

1606 

1607 # We'll empty this list eventually 

1608 certList = list(self) 

1609 

1610 # We make a list of certificates we have to search children for, and iterate 

1611 # through it until it's empty. 

1612 todo = list(tree) 

1613 

1614 # Iterate 

1615 while todo: 

1616 cert, children = todo.pop() 

1617 for c in certList: 

1618 # Check if this certificate matches the one we're looking at 

1619 if c.isIssuer(cert) and c != cert: 

1620 item = (c, []) 

1621 children.append(item) 

1622 certList.remove(c) 

1623 todo.append(item) 

1624 

1625 return tree 

1626 

1627 def getchain(self, cert): 

1628 """ 

1629 Return a chain of certificate that points from a ROOT CA to a certificate. 

1630 """ 

1631 

1632 def _rec_getchain(chain, curtree): 

1633 # See if an element of the current tree signs the cert, if so add it to 

1634 # the chain, else recurse. 

1635 for c, subtree in curtree: 

1636 curchain = chain + [c] 

1637 # If 'cert' is issued by c 

1638 if cert.isIssuer(c): 

1639 # Final node of the chain ! 

1640 # (add the final cert if not self signed) 

1641 if c != cert: 

1642 curchain += [cert] 

1643 return curchain 

1644 else: 

1645 # Not the final node of the chain ! Recurse. 

1646 curchain = _rec_getchain(curchain, subtree) 

1647 if curchain: 

1648 return curchain 

1649 return None 

1650 

1651 chain = _rec_getchain([], self.tree) 

1652 if chain is not None: 

1653 return CertTree(chain) 

1654 else: 

1655 return None 

1656 

1657 def verify(self, cert): 

1658 """ 

1659 Verify that a certificate is properly signed. 

1660 """ 

1661 # Check that we can find a chain to this certificate 

1662 if not self.getchain(cert): 

1663 raise ValueError("Certificate verification failed !") 

1664 

1665 def show(self, ret: bool = False): 

1666 """ 

1667 Return the CertTree as a string certificate tree 

1668 """ 

1669 

1670 def _rec_show(c, children, lvl=0): 

1671 s = "" 

1672 # Process the current CA 

1673 if c: 

1674 if not c.isSelfSigned(): 

1675 s += "%s [Not Self Signed]\n" % c.subject_str 

1676 else: 

1677 s += "%s [Self Signed]\n" % c.subject_str 

1678 s = lvl * " " + s 

1679 lvl += 1 

1680 # Process all sub-CAs at a lower level 

1681 for child, subchildren in children: 

1682 s += _rec_show(child, subchildren, lvl=lvl) 

1683 return s 

1684 

1685 showed = _rec_show(None, self.tree) 

1686 if ret: 

1687 return showed 

1688 else: 

1689 print(showed) 

1690 

1691 def __repr__(self): 

1692 return "<CertTree %s certificates (%s ROOT CA)>" % ( 

1693 len(self), 

1694 len(self.rootCAs), 

1695 ) 

1696 

1697 

1698####### 

1699# CMS # 

1700####### 

1701 

1702# RFC3852 

1703 

1704 

1705class CMS_Engine: 

1706 """ 

1707 A utility class to perform CMS/PKCS7 operations, as specified by RFC3852. 

1708 

1709 :param store: a ROOT CA certificate list to trust. 

1710 :param crls: a list of CRLs to include. This is currently not checked. 

1711 """ 

1712 

1713 def __init__( 

1714 self, 

1715 store: CertList, 

1716 crls: List[X509_CRL] = [], 

1717 ): 

1718 self.store = store 

1719 self.crls = crls 

1720 

1721 def sign( 

1722 self, 

1723 message: Union[bytes, Packet], 

1724 eContentType: ASN1_OID, 

1725 cert: Cert, 

1726 key: PrivKey, 

1727 h: Optional[str] = None, 

1728 ): 

1729 """ 

1730 Sign a message using CMS. 

1731 

1732 :param message: the inner content to sign. 

1733 :param eContentType: the OID of the inner content. 

1734 :param cert: the certificate whose key to use use for signing. 

1735 :param key: the private key to use for signing. 

1736 :param h: the hash to use (default: same as the certificate's signature) 

1737 

1738 We currently only support X.509 certificates ! 

1739 """ 

1740 # RFC3852 - 5.4. Message Digest Calculation Process 

1741 h = h or _get_cert_sig_hashname(cert) 

1742 hash = hashes.Hash(_get_hash(h)) 

1743 hash.update(bytes(message)) 

1744 hashed_message = hash.finalize() 

1745 

1746 # 5.5. Signature Generation Process 

1747 signerInfo = CMS_SignerInfo( 

1748 version=1, 

1749 sid=CMS_IssuerAndSerialNumber( 

1750 issuer=cert.tbsCertificate.issuer, 

1751 serialNumber=cert.tbsCertificate.serialNumber, 

1752 ), 

1753 digestAlgorithm=X509_AlgorithmIdentifier( 

1754 algorithm=ASN1_OID(h), 

1755 parameters=ASN1_NULL(0), 

1756 ), 

1757 signedAttrs=[ 

1758 X509_Attribute( 

1759 type=ASN1_OID("contentType"), 

1760 values=[ 

1761 X509_AttributeValue(value=eContentType), 

1762 ], 

1763 ), 

1764 X509_Attribute( 

1765 type=ASN1_OID("messageDigest"), 

1766 # "A message-digest attribute MUST have a single attribute value" 

1767 values=[ 

1768 X509_AttributeValue(value=ASN1_STRING(hashed_message)), 

1769 ], 

1770 ), 

1771 ], 

1772 signatureAlgorithm=cert.tbsCertificate.signature, 

1773 ) 

1774 signerInfo.signature = ASN1_STRING( 

1775 key.sign( 

1776 bytes( 

1777 CMS_SignedAttrsForSignature( 

1778 signedAttrs=signerInfo.signedAttrs, 

1779 ) 

1780 ), 

1781 h=h, 

1782 ) 

1783 ) 

1784 

1785 # Build a chain of X509_Cert to ship (but skip the ROOT certificate) 

1786 certTree = CertTree(cert, self.store) 

1787 certificates = [x.x509Cert for x in certTree if not x.isSelfSigned()] 

1788 

1789 # Build final structure 

1790 return CMS_ContentInfo( 

1791 contentType=ASN1_OID("id-signedData"), 

1792 content=CMS_SignedData( 

1793 version=3 if certificates else 1, 

1794 digestAlgorithms=X509_AlgorithmIdentifier( 

1795 algorithm=ASN1_OID(h), 

1796 parameters=ASN1_NULL(0), 

1797 ), 

1798 encapContentInfo=CMS_EncapsulatedContentInfo( 

1799 eContentType=eContentType, 

1800 eContent=message, 

1801 ), 

1802 certificates=( 

1803 [CMS_CertificateChoices(certificate=cert) for cert in certificates] 

1804 if certificates 

1805 else None 

1806 ), 

1807 crls=( 

1808 [CMS_RevocationInfoChoice(crl=crl) for crl in self.crls] 

1809 if self.crls 

1810 else None 

1811 ), 

1812 signerInfos=[ 

1813 signerInfo, 

1814 ], 

1815 ), 

1816 ) 

1817 

1818 def verify( 

1819 self, 

1820 contentInfo: CMS_ContentInfo, 

1821 eContentType: Optional[ASN1_OID] = None, 

1822 ): 

1823 """ 

1824 Verify a CMS message against the list of trusted certificates, 

1825 and return the unpacked message if the verification succeeds. 

1826 

1827 :param contentInfo: the ContentInfo whose signature to verify 

1828 :param eContentType: if provided, verifies that the content type is valid 

1829 """ 

1830 if contentInfo.contentType.oidname != "id-signedData": 

1831 raise ValueError("ContentInfo isn't signed !") 

1832 

1833 signeddata = contentInfo.content 

1834 

1835 # Build the certificate chain 

1836 certificates = [] 

1837 if signeddata.certificates: 

1838 certificates = [Cert(x.certificate) for x in signeddata.certificates] 

1839 certTree = CertTree(certificates, self.store) 

1840 

1841 # Check there's at least one signature 

1842 if not signeddata.signerInfos: 

1843 raise ValueError("ContentInfo contained no signature !") 

1844 

1845 # Check all signatures 

1846 for signerInfo in signeddata.signerInfos: 

1847 # Find certificate in the chain that did this 

1848 cert: Cert = certTree.findCertBySid(signerInfo.sid) 

1849 

1850 # Verify certificate signature 

1851 certTree.verify(cert) 

1852 

1853 # Verify the message hash 

1854 if signerInfo.signedAttrs: 

1855 # Verify the contentType 

1856 try: 

1857 contentType = next( 

1858 x.values[0].value 

1859 for x in signerInfo.signedAttrs 

1860 if x.type.oidname == "contentType" 

1861 ) 

1862 

1863 if contentType != signeddata.encapContentInfo.eContentType: 

1864 raise ValueError( 

1865 "Inconsistent 'contentType' was detected in packet !" 

1866 ) 

1867 

1868 if eContentType is not None and eContentType != contentType: 

1869 raise ValueError( 

1870 "Expected '%s' but got '%s' contentType !" 

1871 % ( 

1872 eContentType, 

1873 contentType, 

1874 ) 

1875 ) 

1876 except StopIteration: 

1877 raise ValueError("Missing contentType in signedAttrs !") 

1878 

1879 # Verify the messageDigest value 

1880 try: 

1881 # "A message-digest attribute MUST have a single attribute value" 

1882 messageDigest = next( 

1883 x.values[0].value 

1884 for x in signerInfo.signedAttrs 

1885 if x.type.oidname == "messageDigest" 

1886 ) 

1887 

1888 # Re-calculate hash 

1889 h = signerInfo.digestAlgorithm.algorithm.oidname 

1890 hash = hashes.Hash(_get_hash(h)) 

1891 hash.update(bytes(signeddata.encapContentInfo.eContent)) 

1892 hashed_message = hash.finalize() 

1893 

1894 if hashed_message != messageDigest: 

1895 raise ValueError("Invalid messageDigest value !") 

1896 except StopIteration: 

1897 raise ValueError("Missing messageDigest in signedAttrs !") 

1898 

1899 # Verify the signature 

1900 cert.verify( 

1901 msg=bytes( 

1902 CMS_SignedAttrsForSignature( 

1903 signedAttrs=signerInfo.signedAttrs, 

1904 ) 

1905 ), 

1906 sig=signerInfo.signature.val, 

1907 ) 

1908 else: 

1909 cert.verify( 

1910 msg=bytes(signeddata.encapContentInfo), 

1911 sig=signerInfo.signature.val, 

1912 ) 

1913 

1914 # Return the content 

1915 return signeddata.encapContentInfo.eContent