Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/tls/cert.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

688 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net> 

5# <arno@natisbad.org> 

6# 2015, 2016, 2017 Maxence Tury <maxence.tury@ssi.gouv.fr> 

7 

8""" 

9High-level methods for PKI objects (X.509 certificates, CRLs, asymmetric keys). 

10Supports both RSA and ECDSA objects. 

11 

12The classes below are wrappers for the ASN.1 objects defined in x509.py. 

13For instance, here is what you could do in order to modify the subject public 

14key info of a 'cert' and then resign it with whatever 'key':: 

15 

16 from scapy.layers.tls.cert import * 

17 cert = Cert("cert.der") 

18 k = PrivKeyRSA() # generate a private key 

19 cert.setSubjectPublicKeyFromPrivateKey(k) 

20 cert.resignWith(k) 

21 cert.export("newcert.pem") 

22 k.export("mykey.pem") 

23 

24One could also edit arguments like the serial number, as such:: 

25 

26 from scapy.layers.tls.cert import * 

27 c = Cert("mycert.pem") 

28 c.tbsCertificate.serialNumber = 0x4B1D 

29 k = PrivKey("mykey.pem") # import an existing private key 

30 c.resignWith(k) 

31 c.export("newcert.pem") 

32 

33To export the public key of a private key:: 

34 

35 k = PrivKey("mykey.pem") 

36 k.pubkey.export("mypubkey.pem") 

37 

38No need for obnoxious openssl tweaking anymore. :) 

39""" 

40 

41import base64 

42import os 

43import time 

44 

45from scapy.config import conf, crypto_validator 

46from scapy.error import warning 

47from scapy.utils import binrepr 

48from scapy.asn1.asn1 import ASN1_BIT_STRING 

49from scapy.asn1.mib import hash_by_oid 

50from scapy.layers.x509 import ( 

51 ECDSAPrivateKey_OpenSSL, 

52 ECDSAPrivateKey, 

53 ECDSAPublicKey, 

54 EdDSAPublicKey, 

55 EdDSAPrivateKey, 

56 RSAPrivateKey_OpenSSL, 

57 RSAPrivateKey, 

58 RSAPublicKey, 

59 X509_Cert, 

60 X509_CRL, 

61 X509_SubjectPublicKeyInfo, 

62) 

63from scapy.layers.tls.crypto.pkcs1 import pkcs_os2ip, _get_hash, \ 

64 _EncryptAndVerifyRSA, _DecryptAndSignRSA 

65from scapy.compat import raw, bytes_encode 

66 

67if conf.crypto_valid: 

68 from cryptography.exceptions import InvalidSignature 

69 from cryptography.hazmat.backends import default_backend 

70 from cryptography.hazmat.primitives import serialization 

71 from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519 

72 

73 # cryptography raised the minimum RSA key length to 1024 in 43.0+ 

74 # https://github.com/pyca/cryptography/pull/10278 

75 # but we need still 512 for EXPORT40 ciphers (yes EXPORT is terrible) 

76 # https://datatracker.ietf.org/doc/html/rfc2246#autoid-66 

77 # The following detects the change and hacks around it using the backend 

78 

79 try: 

80 rsa.generate_private_key(public_exponent=65537, key_size=512) 

81 _RSA_512_SUPPORTED = True 

82 except ValueError: 

83 # cryptography > 43.0 

84 _RSA_512_SUPPORTED = False 

85 from cryptography.hazmat.primitives.asymmetric.rsa import rust_openssl 

86 

87 

88# Maximum allowed size in bytes for a certificate file, to avoid 

89# loading huge file when importing a cert 

90_MAX_KEY_SIZE = 50 * 1024 

91_MAX_CERT_SIZE = 50 * 1024 

92_MAX_CRL_SIZE = 10 * 1024 * 1024 # some are that big 

93 

94 

95##################################################################### 

96# Some helpers 

97##################################################################### 

98 

99@conf.commands.register 

100def der2pem(der_string, obj="UNKNOWN"): 

101 """Convert DER octet string to PEM format (with optional header)""" 

102 # Encode a byte string in PEM format. Header advertises <obj> type. 

103 pem_string = "-----BEGIN %s-----\n" % obj 

104 base64_string = base64.b64encode(der_string).decode() 

105 chunks = [base64_string[i:i + 64] for i in range(0, len(base64_string), 64)] # noqa: E501 

106 pem_string += '\n'.join(chunks) 

107 pem_string += "\n-----END %s-----\n" % obj 

108 return pem_string 

109 

110 

111@conf.commands.register 

112def pem2der(pem_string): 

113 """Convert PEM string to DER format""" 

114 # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'. 

115 pem_string = pem_string.replace(b"\r", b"") 

116 first_idx = pem_string.find(b"-----\n") + 6 

117 if pem_string.find(b"-----BEGIN", first_idx) != -1: 

118 raise Exception("pem2der() expects only one PEM-encoded object") 

119 last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----")) 

120 base64_string = pem_string[first_idx:last_idx] 

121 base64_string.replace(b"\n", b"") 

122 der_string = base64.b64decode(base64_string) 

123 return der_string 

124 

125 

126def split_pem(s): 

127 """ 

128 Split PEM objects. Useful to process concatenated certificates. 

129 """ 

130 pem_strings = [] 

131 while s != b"": 

132 start_idx = s.find(b"-----BEGIN") 

133 if start_idx == -1: 

134 break 

135 end_idx = s.find(b"-----END") 

136 if end_idx == -1: 

137 raise Exception("Invalid PEM object (missing END tag)") 

138 end_idx = s.find(b"\n", end_idx) + 1 

139 if end_idx == 0: 

140 # There is no final \n 

141 end_idx = len(s) 

142 pem_strings.append(s[start_idx:end_idx]) 

143 s = s[end_idx:] 

144 return pem_strings 

145 

146 

147class _PKIObj(object): 

148 def __init__(self, frmt, der): 

149 self.frmt = frmt 

150 self._der = der 

151 

152 

153class _PKIObjMaker(type): 

154 def __call__(cls, obj_path, obj_max_size, pem_marker=None): 

155 # This enables transparent DER and PEM-encoded data imports. 

156 # Note that when importing a PEM file with multiple objects (like ECDSA 

157 # private keys output by openssl), it will concatenate every object in 

158 # order to create a 'der' attribute. When converting a 'multi' DER file 

159 # into a PEM file, though, the PEM attribute will not be valid, 

160 # because we do not try to identify the class of each object. 

161 error_msg = "Unable to import data" 

162 

163 if obj_path is None: 

164 raise Exception(error_msg) 

165 obj_path = bytes_encode(obj_path) 

166 

167 if (b'\x00' not in obj_path) and os.path.isfile(obj_path): 

168 _size = os.path.getsize(obj_path) 

169 if _size > obj_max_size: 

170 raise Exception(error_msg) 

171 try: 

172 with open(obj_path, "rb") as f: 

173 _raw = f.read() 

174 except Exception: 

175 raise Exception(error_msg) 

176 else: 

177 _raw = obj_path 

178 

179 try: 

180 if b"-----BEGIN" in _raw: 

181 frmt = "PEM" 

182 pem = _raw 

183 der_list = split_pem(pem) 

184 der = b''.join(map(pem2der, der_list)) 

185 else: 

186 frmt = "DER" 

187 der = _raw 

188 except Exception: 

189 raise Exception(error_msg) 

190 

191 p = _PKIObj(frmt, der) 

192 return p 

193 

194 

195##################################################################### 

196# PKI objects wrappers 

197##################################################################### 

198 

199############### 

200# Public Keys # 

201############### 

202 

203class _PubKeyFactory(_PKIObjMaker): 

204 """ 

205 Metaclass for PubKey creation. 

206 It casts the appropriate class on the fly, then fills in 

207 the appropriate attributes with import_from_asn1pkt() submethod. 

208 """ 

209 def __call__(cls, key_path=None, cryptography_obj=None): 

210 # This allows to import cryptography objects directly 

211 if cryptography_obj is not None: 

212 obj = type.__call__(cls) 

213 obj.__class__ = cls 

214 obj.frmt = "original" 

215 obj.marker = "PUBLIC KEY" 

216 obj.pubkey = cryptography_obj 

217 return obj 

218 

219 if key_path is None: 

220 obj = type.__call__(cls) 

221 if cls is PubKey: 

222 cls = PubKeyRSA 

223 obj.__class__ = cls 

224 obj.frmt = "original" 

225 obj.fill_and_store() 

226 return obj 

227 

228 # This deals with the rare RSA 'kx export' call. 

229 if isinstance(key_path, tuple): 

230 obj = type.__call__(cls) 

231 obj.__class__ = PubKeyRSA 

232 obj.frmt = "tuple" 

233 obj.import_from_tuple(key_path) 

234 return obj 

235 

236 # Now for the usual calls, key_path may be the path to either: 

237 # _an X509_SubjectPublicKeyInfo, as processed by openssl; 

238 # _an RSAPublicKey; 

239 # _an ECDSAPublicKey; 

240 # _an EdDSAPublicKey. 

241 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 

242 try: 

243 spki = X509_SubjectPublicKeyInfo(obj._der) 

244 pubkey = spki.subjectPublicKey 

245 if isinstance(pubkey, RSAPublicKey): 

246 obj.__class__ = PubKeyRSA 

247 obj.import_from_asn1pkt(pubkey) 

248 elif isinstance(pubkey, ECDSAPublicKey): 

249 obj.__class__ = PubKeyECDSA 

250 obj.import_from_der(obj._der) 

251 elif isinstance(pubkey, EdDSAPublicKey): 

252 obj.__class__ = PubKeyEdDSA 

253 obj.import_from_der(obj._der) 

254 else: 

255 raise 

256 obj.marker = "PUBLIC KEY" 

257 except Exception: 

258 try: 

259 pubkey = RSAPublicKey(obj._der) 

260 obj.__class__ = PubKeyRSA 

261 obj.import_from_asn1pkt(pubkey) 

262 obj.marker = "RSA PUBLIC KEY" 

263 except Exception: 

264 # We cannot import an ECDSA public key without curve knowledge 

265 if conf.debug_dissector: 

266 raise 

267 raise Exception("Unable to import public key") 

268 return obj 

269 

270 

271class PubKey(metaclass=_PubKeyFactory): 

272 """ 

273 Parent class for PubKeyRSA, PubKeyECDSA and PubKeyEdDSA. 

274 Provides common verifyCert() and export() methods. 

275 """ 

276 

277 def verifyCert(self, cert): 

278 """ Verifies either a Cert or an X509_Cert. """ 

279 tbsCert = cert.tbsCertificate 

280 sigAlg = tbsCert.signature 

281 h = hash_by_oid[sigAlg.algorithm.val] 

282 sigVal = raw(cert.signatureValue) 

283 return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs') 

284 

285 @property 

286 def pem(self): 

287 return der2pem(self.der, self.marker) 

288 

289 @property 

290 def der(self): 

291 return self.pubkey.public_bytes( 

292 encoding=serialization.Encoding.DER, 

293 format=serialization.PublicFormat.SubjectPublicKeyInfo, 

294 ) 

295 

296 def public_numbers(self, *args, **kwargs): 

297 return self.pubkey.public_numbers(*args, **kwargs) 

298 

299 @property 

300 def key_size(self): 

301 return self.pubkey.key_size 

302 

303 def export(self, filename, fmt=None): 

304 """ 

305 Export public key in 'fmt' format (DER or PEM) to file 'filename' 

306 """ 

307 if fmt is None: 

308 if filename.endswith(".pem"): 

309 fmt = "PEM" 

310 else: 

311 fmt = "DER" 

312 with open(filename, "wb") as f: 

313 if fmt == "DER": 

314 return f.write(self.der) 

315 elif fmt == "PEM": 

316 return f.write(self.pem.encode()) 

317 

318 

319class PubKeyRSA(PubKey, _EncryptAndVerifyRSA): 

320 """ 

321 Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py 

322 Use the 'key' attribute to access original object. 

323 """ 

324 @crypto_validator 

325 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None): 

326 pubExp = pubExp or 65537 

327 if not modulus: 

328 real_modulusLen = modulusLen or 2048 

329 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED: 

330 # cryptography > 43.0 compatibility 

331 private_key = rust_openssl.rsa.generate_private_key( 

332 public_exponent=pubExp, 

333 key_size=real_modulusLen, 

334 ) 

335 else: 

336 private_key = rsa.generate_private_key( 

337 public_exponent=pubExp, 

338 key_size=real_modulusLen, 

339 backend=default_backend(), 

340 ) 

341 self.pubkey = private_key.public_key() 

342 else: 

343 real_modulusLen = len(binrepr(modulus)) 

344 if modulusLen and real_modulusLen != modulusLen: 

345 warning("modulus and modulusLen do not match!") 

346 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 

347 self.pubkey = pubNum.public_key(default_backend()) 

348 

349 self.marker = "PUBLIC KEY" 

350 

351 # Lines below are only useful for the legacy part of pkcs1.py 

352 pubNum = self.pubkey.public_numbers() 

353 self._modulusLen = real_modulusLen 

354 self._modulus = pubNum.n 

355 self._pubExp = pubNum.e 

356 

357 @crypto_validator 

358 def import_from_tuple(self, tup): 

359 # this is rarely used 

360 e, m, mLen = tup 

361 if isinstance(m, bytes): 

362 m = pkcs_os2ip(m) 

363 if isinstance(e, bytes): 

364 e = pkcs_os2ip(e) 

365 self.fill_and_store(modulus=m, pubExp=e) 

366 

367 def import_from_asn1pkt(self, pubkey): 

368 modulus = pubkey.modulus.val 

369 pubExp = pubkey.publicExponent.val 

370 self.fill_and_store(modulus=modulus, pubExp=pubExp) 

371 

372 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 

373 # no ECDSA encryption support, hence no ECDSA specific keywords here 

374 return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L) 

375 

376 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

377 return _EncryptAndVerifyRSA.verify( 

378 self, msg, sig, t=t, h=h, mgf=mgf, L=L) 

379 

380 

381class PubKeyECDSA(PubKey): 

382 """ 

383 Wrapper for ECDSA keys based on the cryptography library. 

384 Use the 'key' attribute to access original object. 

385 """ 

386 @crypto_validator 

387 def fill_and_store(self, curve=None): 

388 curve = curve or ec.SECP256R1 

389 private_key = ec.generate_private_key(curve(), default_backend()) 

390 self.pubkey = private_key.public_key() 

391 

392 @crypto_validator 

393 def import_from_der(self, pubkey): 

394 # No lib support for explicit curves nor compressed points. 

395 self.pubkey = serialization.load_der_public_key( 

396 pubkey, 

397 backend=default_backend(), 

398 ) 

399 

400 def encrypt(self, msg, h="sha256", **kwargs): 

401 raise Exception("No ECDSA encryption support") 

402 

403 @crypto_validator 

404 def verify(self, msg, sig, h="sha256", **kwargs): 

405 # 'sig' should be a DER-encoded signature, as per RFC 3279 

406 try: 

407 self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h))) 

408 return True 

409 except InvalidSignature: 

410 return False 

411 

412 

413class PubKeyEdDSA(PubKey): 

414 """ 

415 Wrapper for EdDSA keys based on the cryptography library. 

416 Use the 'key' attribute to access original object. 

417 """ 

418 @crypto_validator 

419 def fill_and_store(self, curve=None): 

420 curve = curve or x25519.X25519PrivateKey 

421 private_key = curve.generate() 

422 self.pubkey = private_key.public_key() 

423 

424 @crypto_validator 

425 def import_from_der(self, pubkey): 

426 self.pubkey = serialization.load_der_public_key( 

427 pubkey, 

428 backend=default_backend(), 

429 ) 

430 

431 def encrypt(self, msg, **kwargs): 

432 raise Exception("No EdDSA encryption support") 

433 

434 @crypto_validator 

435 def verify(self, msg, sig, **kwargs): 

436 # 'sig' should be a DER-encoded signature, as per RFC 3279 

437 try: 

438 self.pubkey.verify(sig, msg) 

439 return True 

440 except InvalidSignature: 

441 return False 

442 

443 

444################ 

445# Private Keys # 

446################ 

447 

448class _PrivKeyFactory(_PKIObjMaker): 

449 """ 

450 Metaclass for PrivKey creation. 

451 It casts the appropriate class on the fly, then fills in 

452 the appropriate attributes with import_from_asn1pkt() submethod. 

453 """ 

454 def __call__(cls, key_path=None, cryptography_obj=None): 

455 """ 

456 key_path may be the path to either: 

457 _an RSAPrivateKey_OpenSSL (as generated by openssl); 

458 _an ECDSAPrivateKey_OpenSSL (as generated by openssl); 

459 _an RSAPrivateKey; 

460 _an ECDSAPrivateKey. 

461 """ 

462 if key_path is None: 

463 obj = type.__call__(cls) 

464 if cls is PrivKey: 

465 cls = PrivKeyECDSA 

466 obj.__class__ = cls 

467 obj.frmt = "original" 

468 obj.fill_and_store() 

469 return obj 

470 

471 # This allows to import cryptography objects directly 

472 if cryptography_obj is not None: 

473 # We (stupidly) need to go through the whole import process because RSA 

474 # does more than just importing the cryptography objects... 

475 obj = _PKIObj("DER", cryptography_obj.private_bytes( 

476 encoding=serialization.Encoding.DER, 

477 format=serialization.PrivateFormat.PKCS8, 

478 encryption_algorithm=serialization.NoEncryption() 

479 )) 

480 else: 

481 # Load from file 

482 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 

483 

484 try: 

485 privkey = RSAPrivateKey_OpenSSL(obj._der) 

486 privkey = privkey.privateKey 

487 obj.__class__ = PrivKeyRSA 

488 obj.marker = "PRIVATE KEY" 

489 except Exception: 

490 try: 

491 privkey = ECDSAPrivateKey_OpenSSL(obj._der) 

492 privkey = privkey.privateKey 

493 obj.__class__ = PrivKeyECDSA 

494 obj.marker = "EC PRIVATE KEY" 

495 except Exception: 

496 try: 

497 privkey = RSAPrivateKey(obj._der) 

498 obj.__class__ = PrivKeyRSA 

499 obj.marker = "RSA PRIVATE KEY" 

500 except Exception: 

501 try: 

502 privkey = ECDSAPrivateKey(obj._der) 

503 obj.__class__ = PrivKeyECDSA 

504 obj.marker = "EC PRIVATE KEY" 

505 except Exception: 

506 try: 

507 privkey = EdDSAPrivateKey(obj._der) 

508 obj.__class__ = PrivKeyEdDSA 

509 obj.marker = "PRIVATE KEY" 

510 except Exception: 

511 raise Exception("Unable to import private key") 

512 try: 

513 obj.import_from_asn1pkt(privkey) 

514 except ImportError: 

515 pass 

516 return obj 

517 

518 

519class _Raw_ASN1_BIT_STRING(ASN1_BIT_STRING): 

520 """A ASN1_BIT_STRING that ignores BER encoding""" 

521 def __bytes__(self): 

522 return self.val_readable 

523 __str__ = __bytes__ 

524 

525 

526class PrivKey(metaclass=_PrivKeyFactory): 

527 """ 

528 Parent class for PrivKeyRSA, PrivKeyECDSA and PrivKeyEdDSA. 

529 Provides common signTBSCert(), resignCert(), verifyCert() 

530 and export() methods. 

531 """ 

532 

533 def signTBSCert(self, tbsCert, h="sha256"): 

534 """ 

535 Note that this will always copy the signature field from the 

536 tbsCertificate into the signatureAlgorithm field of the result, 

537 regardless of the coherence between its contents (which might 

538 indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2). 

539 

540 There is a small inheritance trick for the computation of sigVal 

541 below: in order to use a sign() method which would apply 

542 to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the 

543 subclasses accept any argument, be it from the RSA or ECDSA world, 

544 and then they keep the ones they're interested in. 

545 Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign(). 

546 """ 

547 sigAlg = tbsCert.signature 

548 h = h or hash_by_oid[sigAlg.algorithm.val] 

549 sigVal = self.sign(raw(tbsCert), h=h, t='pkcs') 

550 c = X509_Cert() 

551 c.tbsCertificate = tbsCert 

552 c.signatureAlgorithm = sigAlg 

553 c.signatureValue = _Raw_ASN1_BIT_STRING(sigVal, readable=True) 

554 return c 

555 

556 def resignCert(self, cert): 

557 """ Rewrite the signature of either a Cert or an X509_Cert. """ 

558 return self.signTBSCert(cert.tbsCertificate, h=None) 

559 

560 def verifyCert(self, cert): 

561 """ Verifies either a Cert or an X509_Cert. """ 

562 tbsCert = cert.tbsCertificate 

563 sigAlg = tbsCert.signature 

564 h = hash_by_oid[sigAlg.algorithm.val] 

565 sigVal = raw(cert.signatureValue) 

566 return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs') 

567 

568 @property 

569 def pem(self): 

570 return der2pem(self.der, self.marker) 

571 

572 @property 

573 def der(self): 

574 return self.key.private_bytes( 

575 encoding=serialization.Encoding.DER, 

576 format=serialization.PrivateFormat.PKCS8, 

577 encryption_algorithm=serialization.NoEncryption() 

578 ) 

579 

580 def export(self, filename, fmt=None): 

581 """ 

582 Export private key in 'fmt' format (DER or PEM) to file 'filename' 

583 """ 

584 if fmt is None: 

585 if filename.endswith(".pem"): 

586 fmt = "PEM" 

587 else: 

588 fmt = "DER" 

589 with open(filename, "wb") as f: 

590 if fmt == "DER": 

591 return f.write(self.der) 

592 elif fmt == "PEM": 

593 return f.write(self.pem.encode()) 

594 

595 

596class PrivKeyRSA(PrivKey, _DecryptAndSignRSA): 

597 """ 

598 Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py 

599 Use the 'key' attribute to access original object. 

600 """ 

601 @crypto_validator 

602 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None, 

603 prime1=None, prime2=None, coefficient=None, 

604 exponent1=None, exponent2=None, privExp=None): 

605 pubExp = pubExp or 65537 

606 if None in [modulus, prime1, prime2, coefficient, privExp, 

607 exponent1, exponent2]: 

608 # note that the library requires every parameter 

609 # in order to call RSAPrivateNumbers(...) 

610 # if one of these is missing, we generate a whole new key 

611 real_modulusLen = modulusLen or 2048 

612 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED: 

613 # cryptography > 43.0 compatibility 

614 self.key = rust_openssl.rsa.generate_private_key( 

615 public_exponent=pubExp, 

616 key_size=real_modulusLen, 

617 ) 

618 else: 

619 self.key = rsa.generate_private_key( 

620 public_exponent=pubExp, 

621 key_size=real_modulusLen, 

622 backend=default_backend(), 

623 ) 

624 pubkey = self.key.public_key() 

625 else: 

626 real_modulusLen = len(binrepr(modulus)) 

627 if modulusLen and real_modulusLen != modulusLen: 

628 warning("modulus and modulusLen do not match!") 

629 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 

630 privNum = rsa.RSAPrivateNumbers(p=prime1, q=prime2, 

631 dmp1=exponent1, dmq1=exponent2, 

632 iqmp=coefficient, d=privExp, 

633 public_numbers=pubNum) 

634 self.key = privNum.private_key(default_backend()) 

635 pubkey = self.key.public_key() 

636 

637 self.marker = "PRIVATE KEY" 

638 

639 # Lines below are only useful for the legacy part of pkcs1.py 

640 pubNum = pubkey.public_numbers() 

641 self._modulusLen = real_modulusLen 

642 self._modulus = pubNum.n 

643 self._pubExp = pubNum.e 

644 

645 self.pubkey = PubKeyRSA((pubNum.e, pubNum.n, real_modulusLen)) 

646 

647 def import_from_asn1pkt(self, privkey): 

648 modulus = privkey.modulus.val 

649 pubExp = privkey.publicExponent.val 

650 privExp = privkey.privateExponent.val 

651 prime1 = privkey.prime1.val 

652 prime2 = privkey.prime2.val 

653 exponent1 = privkey.exponent1.val 

654 exponent2 = privkey.exponent2.val 

655 coefficient = privkey.coefficient.val 

656 self.fill_and_store(modulus=modulus, pubExp=pubExp, 

657 privExp=privExp, prime1=prime1, prime2=prime2, 

658 exponent1=exponent1, exponent2=exponent2, 

659 coefficient=coefficient) 

660 

661 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

662 return self.pubkey.verify( 

663 msg=msg, 

664 sig=sig, 

665 t=t, 

666 h=h, 

667 mgf=mgf, 

668 L=L, 

669 ) 

670 

671 def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None): 

672 return _DecryptAndSignRSA.sign(self, data, t=t, h=h, mgf=mgf, L=L) 

673 

674 

675class PrivKeyECDSA(PrivKey): 

676 """ 

677 Wrapper for ECDSA keys based on SigningKey from ecdsa library. 

678 Use the 'key' attribute to access original object. 

679 """ 

680 @crypto_validator 

681 def fill_and_store(self, curve=None): 

682 curve = curve or ec.SECP256R1 

683 self.key = ec.generate_private_key(curve(), default_backend()) 

684 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

685 self.marker = "EC PRIVATE KEY" 

686 

687 @crypto_validator 

688 def import_from_asn1pkt(self, privkey): 

689 self.key = serialization.load_der_private_key(raw(privkey), None, 

690 backend=default_backend()) # noqa: E501 

691 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

692 self.marker = "EC PRIVATE KEY" 

693 

694 @crypto_validator 

695 def verify(self, msg, sig, h="sha256", **kwargs): 

696 return self.pubkey.verify(msg=msg, sig=sig, h=h, **kwargs) 

697 

698 @crypto_validator 

699 def sign(self, data, h="sha256", **kwargs): 

700 return self.key.sign(data, ec.ECDSA(_get_hash(h))) 

701 

702 

703class PrivKeyEdDSA(PrivKey): 

704 """ 

705 Wrapper for EdDSA keys 

706 Use the 'key' attribute to access original object. 

707 """ 

708 @crypto_validator 

709 def fill_and_store(self, curve=None): 

710 curve = curve or x25519.X25519PrivateKey 

711 self.key = curve.generate() 

712 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

713 self.marker = "PRIVATE KEY" 

714 

715 @crypto_validator 

716 def import_from_asn1pkt(self, privkey): 

717 self.key = serialization.load_der_private_key(raw(privkey), None, 

718 backend=default_backend()) # noqa: E501 

719 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key()) 

720 self.marker = "PRIVATE KEY" 

721 

722 @crypto_validator 

723 def verify(self, msg, sig, **kwargs): 

724 return self.pubkey.verify(msg=msg, sig=sig, **kwargs) 

725 

726 @crypto_validator 

727 def sign(self, data, **kwargs): 

728 return self.key.sign(data) 

729 

730 

731################ 

732# Certificates # 

733################ 

734 

735class _CertMaker(_PKIObjMaker): 

736 """ 

737 Metaclass for Cert creation. It is not necessary as it was for the keys, 

738 but we reuse the model instead of creating redundant constructors. 

739 """ 

740 def __call__(cls, cert_path=None, cryptography_obj=None): 

741 # This allows to import cryptography objects directly 

742 if cryptography_obj is not None: 

743 obj = _PKIObj("DER", cryptography_obj.public_bytes( 

744 encoding=serialization.Encoding.DER, 

745 )) 

746 else: 

747 # Load from file 

748 obj = _PKIObjMaker.__call__(cls, cert_path, 

749 _MAX_CERT_SIZE, "CERTIFICATE") 

750 obj.__class__ = Cert 

751 obj.marker = "CERTIFICATE" 

752 try: 

753 cert = X509_Cert(obj._der) 

754 except Exception: 

755 if conf.debug_dissector: 

756 raise 

757 raise Exception("Unable to import certificate") 

758 obj.import_from_asn1pkt(cert) 

759 return obj 

760 

761 

762class Cert(metaclass=_CertMaker): 

763 """ 

764 Wrapper for the X509_Cert from layers/x509.py. 

765 Use the 'x509Cert' attribute to access original object. 

766 """ 

767 

768 def import_from_asn1pkt(self, cert): 

769 error_msg = "Unable to import certificate" 

770 

771 self.x509Cert = cert 

772 

773 tbsCert = cert.tbsCertificate 

774 self.tbsCertificate = tbsCert 

775 

776 if tbsCert.version: 

777 self.version = tbsCert.version.val + 1 

778 else: 

779 self.version = 1 

780 self.serial = tbsCert.serialNumber.val 

781 self.sigAlg = tbsCert.signature.algorithm.oidname 

782 self.issuer = tbsCert.get_issuer() 

783 self.issuer_str = tbsCert.get_issuer_str() 

784 self.issuer_hash = hash(self.issuer_str) 

785 self.subject = tbsCert.get_subject() 

786 self.subject_str = tbsCert.get_subject_str() 

787 self.subject_hash = hash(self.subject_str) 

788 self.authorityKeyID = None 

789 

790 self.notBefore_str = tbsCert.validity.not_before.pretty_time 

791 try: 

792 self.notBefore = tbsCert.validity.not_before.datetime.timetuple() 

793 except ValueError: 

794 raise Exception(error_msg) 

795 self.notBefore_str_simple = time.strftime("%x", self.notBefore) 

796 

797 self.notAfter_str = tbsCert.validity.not_after.pretty_time 

798 try: 

799 self.notAfter = tbsCert.validity.not_after.datetime.timetuple() 

800 except ValueError: 

801 raise Exception(error_msg) 

802 self.notAfter_str_simple = time.strftime("%x", self.notAfter) 

803 

804 self.pubKey = PubKey(raw(tbsCert.subjectPublicKeyInfo)) 

805 

806 if tbsCert.extensions: 

807 for extn in tbsCert.extensions: 

808 if extn.extnID.oidname == "basicConstraints": 

809 self.cA = False 

810 if extn.extnValue.cA: 

811 self.cA = not (extn.extnValue.cA.val == 0) 

812 elif extn.extnID.oidname == "keyUsage": 

813 self.keyUsage = extn.extnValue.get_keyUsage() 

814 elif extn.extnID.oidname == "extKeyUsage": 

815 self.extKeyUsage = extn.extnValue.get_extendedKeyUsage() 

816 elif extn.extnID.oidname == "authorityKeyIdentifier": 

817 self.authorityKeyID = extn.extnValue.keyIdentifier.val 

818 

819 self.signatureValue = raw(cert.signatureValue) 

820 self.signatureLen = len(self.signatureValue) 

821 

822 def isIssuerCert(self, other): 

823 """ 

824 True if 'other' issued 'self', i.e.: 

825 - self.issuer == other.subject 

826 - self is signed by other 

827 """ 

828 if self.issuer_hash != other.subject_hash: 

829 return False 

830 return other.pubKey.verifyCert(self) 

831 

832 def isSelfSigned(self): 

833 """ 

834 Return True if the certificate is self-signed: 

835 - issuer and subject are the same 

836 - the signature of the certificate is valid. 

837 """ 

838 if self.issuer_hash == self.subject_hash: 

839 return self.isIssuerCert(self) 

840 return False 

841 

842 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 

843 # no ECDSA *encryption* support, hence only RSA specific keywords here 

844 return self.pubKey.encrypt(msg, t=t, h=h, mgf=mgf, L=L) 

845 

846 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 

847 return self.pubKey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L) 

848 

849 def getSignatureHash(self): 

850 """ 

851 Return the hash used by the 'signatureAlgorithm' 

852 """ 

853 tbsCert = self.tbsCertificate 

854 sigAlg = tbsCert.signature 

855 h = hash_by_oid[sigAlg.algorithm.val] 

856 return _get_hash(h) 

857 

858 def setSubjectPublicKeyFromPrivateKey(self, key): 

859 """ 

860 Replace the subjectPublicKeyInfo of this certificate with the one from 

861 the provided key. 

862 """ 

863 if isinstance(key, (PubKey, PrivKey)): 

864 if isinstance(key, PrivKey): 

865 pubkey = key.pubkey 

866 else: 

867 pubkey = key 

868 self.tbsCertificate.subjectPublicKeyInfo = X509_SubjectPublicKeyInfo( 

869 pubkey.der 

870 ) 

871 else: 

872 raise ValueError("Unknown type 'key', should be PubKey or PrivKey") 

873 

874 def resignWith(self, key): 

875 """ 

876 Resign a certificate with a specific key 

877 """ 

878 self.import_from_asn1pkt(key.resignCert(self)) 

879 

880 def remainingDays(self, now=None): 

881 """ 

882 Based on the value of notAfter field, returns the number of 

883 days the certificate will still be valid. The date used for the 

884 comparison is the current and local date, as returned by 

885 time.localtime(), except if 'now' argument is provided another 

886 one. 'now' argument can be given as either a time tuple or a string 

887 representing the date. Accepted format for the string version 

888 are: 

889 

890 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT' 

891 - '%m/%d/%y' e.g. '01/30/08' (less precise) 

892 

893 If the certificate is no more valid at the date considered, then 

894 a negative value is returned representing the number of days 

895 since it has expired. 

896 

897 The number of days is returned as a float to deal with the unlikely 

898 case of certificates that are still just valid. 

899 """ 

900 if now is None: 

901 now = time.localtime() 

902 elif isinstance(now, str): 

903 try: 

904 if '/' in now: 

905 now = time.strptime(now, '%m/%d/%y') 

906 else: 

907 now = time.strptime(now, '%b %d %H:%M:%S %Y %Z') 

908 except Exception: 

909 warning("Bad time string provided, will use localtime() instead.") # noqa: E501 

910 now = time.localtime() 

911 

912 now = time.mktime(now) 

913 nft = time.mktime(self.notAfter) 

914 diff = (nft - now) / (24. * 3600) 

915 return diff 

916 

917 def isRevoked(self, crl_list): 

918 """ 

919 Given a list of trusted CRL (their signature has already been 

920 verified with trusted anchors), this function returns True if 

921 the certificate is marked as revoked by one of those CRL. 

922 

923 Note that if the Certificate was on hold in a previous CRL and 

924 is now valid again in a new CRL and bot are in the list, it 

925 will be considered revoked: this is because _all_ CRLs are 

926 checked (not only the freshest) and revocation status is not 

927 handled. 

928 

929 Also note that the check on the issuer is performed on the 

930 Authority Key Identifier if available in _both_ the CRL and the 

931 Cert. Otherwise, the issuers are simply compared. 

932 """ 

933 for c in crl_list: 

934 if (self.authorityKeyID is not None and 

935 c.authorityKeyID is not None and 

936 self.authorityKeyID == c.authorityKeyID): 

937 return self.serial in (x[0] for x in c.revoked_cert_serials) 

938 elif self.issuer == c.issuer: 

939 return self.serial in (x[0] for x in c.revoked_cert_serials) 

940 return False 

941 

942 @property 

943 def pem(self): 

944 return der2pem(self.der, self.marker) 

945 

946 @property 

947 def der(self): 

948 return bytes(self.x509Cert) 

949 

950 def export(self, filename, fmt=None): 

951 """ 

952 Export certificate in 'fmt' format (DER or PEM) to file 'filename' 

953 """ 

954 if fmt is None: 

955 if filename.endswith(".pem"): 

956 fmt = "PEM" 

957 else: 

958 fmt = "DER" 

959 with open(filename, "wb") as f: 

960 if fmt == "DER": 

961 return f.write(self.der) 

962 elif fmt == "PEM": 

963 return f.write(self.pem.encode()) 

964 

965 def show(self): 

966 print("Serial: %s" % self.serial) 

967 print("Issuer: " + self.issuer_str) 

968 print("Subject: " + self.subject_str) 

969 print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str)) 

970 

971 def __repr__(self): 

972 return "[X.509 Cert. Subject:%s, Issuer:%s]" % (self.subject_str, self.issuer_str) # noqa: E501 

973 

974 

975################################ 

976# Certificate Revocation Lists # 

977################################ 

978 

979class _CRLMaker(_PKIObjMaker): 

980 """ 

981 Metaclass for CRL creation. It is not necessary as it was for the keys, 

982 but we reuse the model instead of creating redundant constructors. 

983 """ 

984 def __call__(cls, cert_path): 

985 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL") 

986 obj.__class__ = CRL 

987 try: 

988 crl = X509_CRL(obj._der) 

989 except Exception: 

990 raise Exception("Unable to import CRL") 

991 obj.import_from_asn1pkt(crl) 

992 return obj 

993 

994 

995class CRL(metaclass=_CRLMaker): 

996 """ 

997 Wrapper for the X509_CRL from layers/x509.py. 

998 Use the 'x509CRL' attribute to access original object. 

999 """ 

1000 

1001 def import_from_asn1pkt(self, crl): 

1002 error_msg = "Unable to import CRL" 

1003 

1004 self.x509CRL = crl 

1005 

1006 tbsCertList = crl.tbsCertList 

1007 self.tbsCertList = raw(tbsCertList) 

1008 

1009 if tbsCertList.version: 

1010 self.version = tbsCertList.version.val + 1 

1011 else: 

1012 self.version = 1 

1013 self.sigAlg = tbsCertList.signature.algorithm.oidname 

1014 self.issuer = tbsCertList.get_issuer() 

1015 self.issuer_str = tbsCertList.get_issuer_str() 

1016 self.issuer_hash = hash(self.issuer_str) 

1017 

1018 self.lastUpdate_str = tbsCertList.this_update.pretty_time 

1019 lastUpdate = tbsCertList.this_update.val 

1020 if lastUpdate[-1] == "Z": 

1021 lastUpdate = lastUpdate[:-1] 

1022 try: 

1023 self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S") 

1024 except Exception: 

1025 raise Exception(error_msg) 

1026 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate) 

1027 

1028 self.nextUpdate = None 

1029 self.nextUpdate_str_simple = None 

1030 if tbsCertList.next_update: 

1031 self.nextUpdate_str = tbsCertList.next_update.pretty_time 

1032 nextUpdate = tbsCertList.next_update.val 

1033 if nextUpdate[-1] == "Z": 

1034 nextUpdate = nextUpdate[:-1] 

1035 try: 

1036 self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S") 

1037 except Exception: 

1038 raise Exception(error_msg) 

1039 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate) 

1040 

1041 if tbsCertList.crlExtensions: 

1042 for extension in tbsCertList.crlExtensions: 

1043 if extension.extnID.oidname == "cRLNumber": 

1044 self.number = extension.extnValue.cRLNumber.val 

1045 

1046 revoked = [] 

1047 if tbsCertList.revokedCertificates: 

1048 for cert in tbsCertList.revokedCertificates: 

1049 serial = cert.serialNumber.val 

1050 date = cert.revocationDate.val 

1051 if date[-1] == "Z": 

1052 date = date[:-1] 

1053 try: 

1054 time.strptime(date, "%y%m%d%H%M%S") 

1055 except Exception: 

1056 raise Exception(error_msg) 

1057 revoked.append((serial, date)) 

1058 self.revoked_cert_serials = revoked 

1059 

1060 self.signatureValue = raw(crl.signatureValue) 

1061 self.signatureLen = len(self.signatureValue) 

1062 

1063 def isIssuerCert(self, other): 

1064 # This is exactly the same thing as in Cert method. 

1065 if self.issuer_hash != other.subject_hash: 

1066 return False 

1067 return other.pubKey.verifyCert(self) 

1068 

1069 def verify(self, anchors): 

1070 # Return True iff the CRL is signed by one of the provided anchors. 

1071 return any(self.isIssuerCert(a) for a in anchors) 

1072 

1073 def show(self): 

1074 print("Version: %d" % self.version) 

1075 print("sigAlg: " + self.sigAlg) 

1076 print("Issuer: " + self.issuer_str) 

1077 print("lastUpdate: %s" % self.lastUpdate_str) 

1078 print("nextUpdate: %s" % self.nextUpdate_str) 

1079 

1080 

1081###################### 

1082# Certificate chains # 

1083###################### 

1084 

1085class Chain(list): 

1086 """ 

1087 Basically, an enhanced array of Cert. 

1088 """ 

1089 

1090 def __init__(self, certList, cert0=None): 

1091 """ 

1092 Construct a chain of certificates starting with a self-signed 

1093 certificate (or any certificate submitted by the user) 

1094 and following issuer/subject matching and signature validity. 

1095 If there is exactly one chain to be constructed, it will be, 

1096 but if there are multiple potential chains, there is no guarantee 

1097 that the retained one will be the longest one. 

1098 As Cert and CRL classes both share an isIssuerCert() method, 

1099 the trailing element of a Chain may alternatively be a CRL. 

1100 

1101 Note that we do not check AKID/{SKID/issuer/serial} matching, 

1102 nor the presence of keyCertSign in keyUsage extension (if present). 

1103 """ 

1104 list.__init__(self, ()) 

1105 if cert0: 

1106 self.append(cert0) 

1107 else: 

1108 for root_candidate in certList: 

1109 if root_candidate.isSelfSigned(): 

1110 self.append(root_candidate) 

1111 certList.remove(root_candidate) 

1112 break 

1113 

1114 if len(self) > 0: 

1115 while certList: 

1116 tmp_len = len(self) 

1117 for c in certList: 

1118 if c.isIssuerCert(self[-1]): 

1119 self.append(c) 

1120 certList.remove(c) 

1121 break 

1122 if len(self) == tmp_len: 

1123 # no new certificate appended to self 

1124 break 

1125 

1126 def verifyChain(self, anchors, untrusted=None): 

1127 """ 

1128 Perform verification of certificate chains for that certificate. 

1129 A list of anchors is required. The certificates in the optional 

1130 untrusted list may be used as additional elements to the final chain. 

1131 On par with chain instantiation, only one chain constructed with the 

1132 untrusted candidates will be retained. Eventually, dates are checked. 

1133 """ 

1134 untrusted = untrusted or [] 

1135 for a in anchors: 

1136 chain = Chain(self + untrusted, a) 

1137 if len(chain) == 1: # anchor only 

1138 continue 

1139 # check that the chain does not exclusively rely on untrusted 

1140 if any(c in chain[1:] for c in self): 

1141 for c in chain: 

1142 if c.remainingDays() < 0: 

1143 break 

1144 if c is chain[-1]: # we got to the end of the chain 

1145 return chain 

1146 return None 

1147 

1148 def verifyChainFromCAFile(self, cafile, untrusted_file=None): 

1149 """ 

1150 Does the same job as .verifyChain() but using the list of anchors 

1151 from the cafile. As for .verifyChain(), a list of untrusted 

1152 certificates can be passed (as a file, this time). 

1153 """ 

1154 try: 

1155 with open(cafile, "rb") as f: 

1156 ca_certs = f.read() 

1157 except Exception: 

1158 raise Exception("Could not read from cafile") 

1159 

1160 anchors = [Cert(c) for c in split_pem(ca_certs)] 

1161 

1162 untrusted = None 

1163 if untrusted_file: 

1164 try: 

1165 with open(untrusted_file, "rb") as f: 

1166 untrusted_certs = f.read() 

1167 except Exception: 

1168 raise Exception("Could not read from untrusted_file") 

1169 untrusted = [Cert(c) for c in split_pem(untrusted_certs)] 

1170 

1171 return self.verifyChain(anchors, untrusted) 

1172 

1173 def verifyChainFromCAPath(self, capath, untrusted_file=None): 

1174 """ 

1175 Does the same job as .verifyChainFromCAFile() but using the list 

1176 of anchors in capath directory. The directory should (only) contain 

1177 certificates files in PEM format. As for .verifyChainFromCAFile(), 

1178 a list of untrusted certificates can be passed as a file 

1179 (concatenation of the certificates in PEM format). 

1180 """ 

1181 try: 

1182 anchors = [] 

1183 for cafile in os.listdir(capath): 

1184 with open(os.path.join(capath, cafile), "rb") as fd: 

1185 anchors.append(Cert(fd.read())) 

1186 except Exception: 

1187 raise Exception("capath provided is not a valid cert path") 

1188 

1189 untrusted = None 

1190 if untrusted_file: 

1191 try: 

1192 with open(untrusted_file, "rb") as f: 

1193 untrusted_certs = f.read() 

1194 except Exception: 

1195 raise Exception("Could not read from untrusted_file") 

1196 untrusted = [Cert(c) for c in split_pem(untrusted_certs)] 

1197 

1198 return self.verifyChain(anchors, untrusted) 

1199 

1200 def __repr__(self): 

1201 llen = len(self) - 1 

1202 if llen < 0: 

1203 return "" 

1204 c = self[0] 

1205 s = "__ " 

1206 if not c.isSelfSigned(): 

1207 s += "%s [Not Self Signed]\n" % c.subject_str 

1208 else: 

1209 s += "%s [Self Signed]\n" % c.subject_str 

1210 idx = 1 

1211 while idx <= llen: 

1212 c = self[idx] 

1213 s += "%s_ %s" % (" " * idx * 2, c.subject_str) 

1214 if idx != llen: 

1215 s += "\n" 

1216 idx += 1 

1217 return s