1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net>
5# <arno@natisbad.org>
6# 2015, 2016, 2017 Maxence Tury <maxence.tury@ssi.gouv.fr>
7# 2022-2025 Gabriel Potter
8
9r"""
10High-level methods for PKI objects (X.509 certificates, CRLs, CSR, Keys, CMS).
11Supported keys include RSA, ECDSA and EdDSA.
12
13The classes below are wrappers for the ASN.1 objects defined in x509.py.
14
15Example 1: Certificate & Private key
16____________________________________
17
18For instance, here is what you could do in order to modify the subject public
19key info of a 'cert' and then resign it with whatever 'key'::
20
21 >>> from scapy.layers.tls.cert import *
22 >>> cert = Cert("cert.der")
23 >>> k = PrivKeyRSA() # generate a private key
24 >>> cert.setSubjectPublicKeyFromPrivateKey(k)
25 >>> cert.resignWith(k)
26 >>> cert.export("newcert.pem")
27 >>> k.export("mykey.pem")
28
29One could also edit arguments like the serial number, as such::
30
31 >>> from scapy.layers.tls.cert import *
32 >>> c = Cert("mycert.pem")
33 >>> c.tbsCertificate.serialNumber = 0x4B1D
34 >>> k = PrivKey("mykey.pem") # import an existing private key
35 >>> c.resignWith(k)
36 >>> c.export("newcert.pem")
37
38To export the public key of a private key::
39
40 >>> k = PrivKey("mykey.pem")
41 >>> k.pubkey.export("mypubkey.pem")
42
43Example 2: CertList and CertTree
44________________________________
45
46Load a .pem file that contains multiple certificates::
47
48 >>> l = CertList("ca_chain.pem")
49 >>> l.show()
50 0000 [X.509 Cert Subject:/C=FR/OU=Scapy Test PKI/CN=Scapy Test CA...]
51 0001 [X.509 Cert Subject:/C=FR/OU=Scapy Test PKI/CN=Scapy Test Client...]
52
53Use 'CertTree' to organize the certificates in a tree::
54
55 >>> tree = CertTree("ca_chain.pem") # or tree = CertTree(l)
56 >>> tree.show()
57 /C=Ulaanbaatar/OU=Scapy Test PKI/CN=Scapy Test CA [Self Signed]
58 /C=FR/OU=Scapy Test PKI/CN=Scapy Test Client [Not Self Signed]
59
60Example 3: Certificate Signing Request (CSR)
61____________________________________________
62
63Scapy's :py:class:`~scapy.layers.tls.cert.CSR` class supports both PKCS#10 and CMC
64formats.
65
66Load and display a CSR::
67
68 >>> csr = CSR("cert.req")
69 >>> csr
70 [CSR Format: CMC, Subject:/O=TestOrg/CN=TestCN, Verified: True]
71 >>> csr.certReq.show()
72 ###[ PKCS10_CertificationRequest ]###
73 \certificationRequestInfo\
74 |###[ PKCS10_CertificationRequestInfo ]###
75 | version = 0x0 <ASN1_INTEGER[0]
76 | \subject \
77 | |###[ X509_RDN ]###
78 | | \rdn \
79 | | |###[ X509_AttributeTypeAndValue ]###
80 | | | type = <ASN1_OID['organizationName']>
81 | | | value = <ASN1_UTF8_STRING[b'TestOrg']>
82 [...]
83
84Get its public key and verify its signature::
85
86 >>> csr.pubkey
87 <scapy.layers.tls.cert.PubKeyRSA at 0x7f3481149310>
88 >>> csr.verifySelf()
89 True
90
91No need for obnoxious openssl tweaking anymore. :)
92"""
93
94import base64
95import enum
96import os
97import time
98import warnings
99
100from scapy.config import conf, crypto_validator
101from scapy.compat import Self
102from scapy.error import warning
103from scapy.utils import binrepr
104from scapy.asn1.asn1 import (
105 ASN1_BIT_STRING,
106 ASN1_NULL,
107 ASN1_OID,
108 ASN1_STRING,
109)
110from scapy.asn1.mib import hash_by_oid
111from scapy.packet import Packet
112from scapy.layers.x509 import (
113 CMS_CertificateChoices,
114 CMS_ContentInfo,
115 CMS_EncapsulatedContentInfo,
116 CMS_IssuerAndSerialNumber,
117 CMS_RevocationInfoChoice,
118 CMS_SignedAttrsForSignature,
119 CMS_SignedData,
120 CMS_SignerInfo,
121 CMS_SubjectKeyIdentifier,
122 ECDSAPrivateKey_OpenSSL,
123 ECDSAPrivateKey,
124 ECDSAPublicKey,
125 EdDSAPrivateKey,
126 EdDSAPublicKey,
127 PKCS10_CertificationRequest,
128 RSAPrivateKey_OpenSSL,
129 RSAPrivateKey,
130 RSAPublicKey,
131 X509_AlgorithmIdentifier,
132 X509_Attribute,
133 X509_AttributeValue,
134 X509_Cert,
135 X509_CRL,
136 X509_SubjectPublicKeyInfo,
137)
138from scapy.layers.tls.crypto.hash import _get_hash
139from scapy.layers.tls.crypto.pkcs1 import (
140 _DecryptAndSignRSA,
141 _EncryptAndVerifyRSA,
142 pkcs_os2ip,
143)
144from scapy.compat import bytes_encode
145
146# Typing imports
147from typing import (
148 List,
149 Optional,
150 Union,
151)
152
153if conf.crypto_valid:
154 from cryptography.exceptions import InvalidSignature
155 from cryptography.hazmat.backends import default_backend
156 from cryptography.hazmat.primitives import hashes
157 from cryptography.hazmat.primitives import serialization
158 from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519, x448
159
160 # cryptography raised the minimum RSA key length to 1024 in 43.0+
161 # https://github.com/pyca/cryptography/pull/10278
162 # but we need still 512 for EXPORT40 ciphers (yes EXPORT is terrible)
163 # https://datatracker.ietf.org/doc/html/rfc2246#autoid-66
164 # The following detects the change and hacks around it using the backend
165
166 try:
167 rsa.generate_private_key(public_exponent=65537, key_size=512)
168 _RSA_512_SUPPORTED = True
169 except ValueError:
170 # cryptography > 43.0
171 _RSA_512_SUPPORTED = False
172 from cryptography.hazmat.primitives.asymmetric.rsa import rust_openssl
173
174
175# Maximum allowed size in bytes for a certificate file, to avoid
176# loading huge file when importing a cert
177_MAX_KEY_SIZE = 50 * 1024
178_MAX_CERT_SIZE = 50 * 1024
179_MAX_CRL_SIZE = 10 * 1024 * 1024 # some are that big
180_MAX_CSR_SIZE = 50 * 1024
181
182
183#####################################################################
184# Some helpers
185#####################################################################
186
187
188@conf.commands.register
189def der2pem(der_string, obj="UNKNOWN"):
190 """Convert DER octet string to PEM format (with optional header)"""
191 # Encode a byte string in PEM format. Header advertises <obj> type.
192 pem_string = "-----BEGIN %s-----\n" % obj
193 base64_string = base64.b64encode(der_string).decode()
194 chunks = [base64_string[i : i + 64] for i in range(0, len(base64_string), 64)]
195 pem_string += "\n".join(chunks)
196 pem_string += "\n-----END %s-----\n" % obj
197 return pem_string
198
199
200@conf.commands.register
201def pem2der(pem_string):
202 """Convert PEM string to DER format"""
203 # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'.
204 pem_string = pem_string.replace(b"\r", b"")
205 first_idx = pem_string.find(b"-----\n") + 6
206 if pem_string.find(b"-----BEGIN", first_idx) != -1:
207 raise Exception("pem2der() expects only one PEM-encoded object")
208 last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----"))
209 base64_string = pem_string[first_idx:last_idx]
210 base64_string.replace(b"\n", b"")
211 der_string = base64.b64decode(base64_string)
212 return der_string
213
214
215def split_pem(s):
216 """
217 Split PEM objects. Useful to process concatenated certificates.
218 """
219 pem_strings = []
220 while s != b"":
221 start_idx = s.find(b"-----BEGIN")
222 if start_idx == -1:
223 break
224 end_idx = s.find(b"-----END")
225 if end_idx == -1:
226 raise Exception("Invalid PEM object (missing END tag)")
227 end_idx = s.find(b"\n", end_idx) + 1
228 if end_idx == 0:
229 # There is no final \n
230 end_idx = len(s)
231 pem_strings.append(s[start_idx:end_idx])
232 s = s[end_idx:]
233 return pem_strings
234
235
236class _PKIObj(object):
237 def __init__(self, frmt, der):
238 self.frmt = frmt
239 self._der = der
240
241
242class _PKIObjMaker(type):
243 def __call__(cls, obj_path, obj_max_size, pem_marker=None):
244 # This enables transparent DER and PEM-encoded data imports.
245 # Note that when importing a PEM file with multiple objects (like ECDSA
246 # private keys output by openssl), it will concatenate every object in
247 # order to create a 'der' attribute. When converting a 'multi' DER file
248 # into a PEM file, though, the PEM attribute will not be valid,
249 # because we do not try to identify the class of each object.
250 error_msg = "Unable to import data"
251
252 if obj_path is None:
253 raise Exception(error_msg)
254 obj_path = bytes_encode(obj_path)
255
256 if (b"\x00" not in obj_path) and os.path.isfile(obj_path):
257 _size = os.path.getsize(obj_path)
258 if _size > obj_max_size:
259 raise Exception(error_msg)
260 try:
261 with open(obj_path, "rb") as f:
262 _raw = f.read()
263 except Exception:
264 raise Exception(error_msg)
265 else:
266 _raw = obj_path
267
268 try:
269 if b"-----BEGIN" in _raw:
270 frmt = "PEM"
271 pem = _raw
272 der_list = split_pem(pem)
273 der = b"".join(map(pem2der, der_list))
274 else:
275 frmt = "DER"
276 der = _raw
277 except Exception:
278 raise Exception(error_msg)
279
280 p = _PKIObj(frmt, der)
281 return p
282
283
284#####################################################################
285# PKI objects wrappers
286#####################################################################
287
288###############
289# Public Keys #
290###############
291
292
293class _PubKeyFactory(_PKIObjMaker):
294 """
295 Metaclass for PubKey creation.
296 It casts the appropriate class on the fly, then fills in
297 the appropriate attributes with import_from_asn1pkt() submethod.
298 """
299
300 def __call__(cls, key_path=None, cryptography_obj=None):
301 # This allows to import cryptography objects directly
302 if cryptography_obj is not None:
303 obj = type.__call__(cls)
304 obj.__class__ = cls
305 obj.frmt = "original"
306 obj.marker = "PUBLIC KEY"
307 obj.pubkey = cryptography_obj
308 return obj
309
310 if key_path is None:
311 obj = type.__call__(cls)
312 if cls is PubKey:
313 cls = PubKeyRSA
314 obj.__class__ = cls
315 obj.frmt = "original"
316 obj.fill_and_store()
317 return obj
318
319 # This deals with the rare RSA 'kx export' call.
320 if isinstance(key_path, tuple):
321 obj = type.__call__(cls)
322 obj.__class__ = PubKeyRSA
323 obj.frmt = "tuple"
324 obj.import_from_tuple(key_path)
325 return obj
326
327 # Now for the usual calls, key_path may be the path to either:
328 # _an X509_SubjectPublicKeyInfo, as processed by openssl;
329 # _an RSAPublicKey;
330 # _an ECDSAPublicKey;
331 # _an EdDSAPublicKey.
332 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
333 try:
334 spki = X509_SubjectPublicKeyInfo(obj._der)
335 pubkey = spki.subjectPublicKey
336 if isinstance(pubkey, RSAPublicKey):
337 obj.__class__ = PubKeyRSA
338 obj.import_from_asn1pkt(pubkey)
339 elif isinstance(pubkey, ECDSAPublicKey):
340 obj.__class__ = PubKeyECDSA
341 obj.import_from_der(obj._der)
342 elif isinstance(pubkey, EdDSAPublicKey):
343 obj.__class__ = PubKeyEdDSA
344 obj.import_from_der(obj._der)
345 else:
346 raise
347 obj.marker = "PUBLIC KEY"
348 except Exception:
349 try:
350 pubkey = RSAPublicKey(obj._der)
351 obj.__class__ = PubKeyRSA
352 obj.import_from_asn1pkt(pubkey)
353 obj.marker = "RSA PUBLIC KEY"
354 except Exception:
355 # We cannot import an ECDSA public key without curve knowledge
356 if conf.debug_dissector:
357 raise
358 raise Exception("Unable to import public key")
359 return obj
360
361
362class PubKey(metaclass=_PubKeyFactory):
363 """
364 Parent class for PubKeyRSA, PubKeyECDSA and PubKeyEdDSA.
365 Provides common verifyCert() and export() methods.
366 """
367
368 def verifyCert(self, cert):
369 """Verifies either a Cert or an X509_Cert."""
370 h = _get_cert_sig_hashname(cert)
371 tbsCert = cert.tbsCertificate
372 sigVal = bytes(cert.signatureValue)
373 return self.verify(bytes(tbsCert), sigVal, h=h, t="pkcs")
374
375 def verifyCsr(self, csr):
376 """Verifies a CSR."""
377 h = _get_csr_sig_hashname(csr)
378 certReqInfo = csr.certReq.certificationRequestInfo
379 sigVal = bytes(csr.certReq.signature)
380 return self.verify(bytes(certReqInfo), sigVal, h=h, t="pkcs")
381
382 @property
383 def pem(self):
384 return der2pem(self.der, self.marker)
385
386 @property
387 def der(self):
388 return self.pubkey.public_bytes(
389 encoding=serialization.Encoding.DER,
390 format=serialization.PublicFormat.SubjectPublicKeyInfo,
391 )
392
393 def public_numbers(self, *args, **kwargs):
394 return self.pubkey.public_numbers(*args, **kwargs)
395
396 @property
397 def key_size(self):
398 return self.pubkey.key_size
399
400 def export(self, filename, fmt=None):
401 """
402 Export public key in 'fmt' format (DER or PEM) to file 'filename'
403 """
404 if fmt is None:
405 if filename.endswith(".pem"):
406 fmt = "PEM"
407 else:
408 fmt = "DER"
409 with open(filename, "wb") as f:
410 if fmt == "DER":
411 return f.write(self.der)
412 elif fmt == "PEM":
413 return f.write(self.pem.encode())
414
415 @crypto_validator
416 def verify(self, msg, sig, h="sha256", **kwargs):
417 """
418 Verify signed data.
419 """
420 raise NotImplementedError
421
422
423class PubKeyRSA(PubKey, _EncryptAndVerifyRSA):
424 """
425 Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py
426 Use the 'key' attribute to access original object.
427 """
428
429 @crypto_validator
430 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None):
431 pubExp = pubExp or 65537
432 if not modulus:
433 real_modulusLen = modulusLen or 2048
434 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED:
435 # cryptography > 43.0 compatibility
436 private_key = rust_openssl.rsa.generate_private_key(
437 public_exponent=pubExp,
438 key_size=real_modulusLen,
439 )
440 else:
441 private_key = rsa.generate_private_key(
442 public_exponent=pubExp,
443 key_size=real_modulusLen,
444 backend=default_backend(),
445 )
446 self.pubkey = private_key.public_key()
447 else:
448 real_modulusLen = len(binrepr(modulus))
449 if modulusLen and real_modulusLen != modulusLen:
450 warning("modulus and modulusLen do not match!")
451 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
452 self.pubkey = pubNum.public_key(default_backend())
453
454 self.marker = "PUBLIC KEY"
455
456 # Lines below are only useful for the legacy part of pkcs1.py
457 pubNum = self.pubkey.public_numbers()
458 self._modulusLen = real_modulusLen
459 self._modulus = pubNum.n
460 self._pubExp = pubNum.e
461
462 @crypto_validator
463 def import_from_tuple(self, tup):
464 # this is rarely used
465 e, m, mLen = tup
466 if isinstance(m, bytes):
467 m = pkcs_os2ip(m)
468 if isinstance(e, bytes):
469 e = pkcs_os2ip(e)
470 self.fill_and_store(modulus=m, pubExp=e)
471
472 def import_from_asn1pkt(self, pubkey):
473 modulus = pubkey.modulus.val
474 pubExp = pubkey.publicExponent.val
475 self.fill_and_store(modulus=modulus, pubExp=pubExp)
476
477 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
478 # no ECDSA encryption support, hence no ECDSA specific keywords here
479 return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L)
480
481 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
482 return _EncryptAndVerifyRSA.verify(self, msg, sig, t=t, h=h, mgf=mgf, L=L)
483
484
485class PubKeyECDSA(PubKey):
486 """
487 Wrapper for ECDSA keys based on the cryptography library.
488 Use the 'key' attribute to access original object.
489 """
490
491 @crypto_validator
492 def fill_and_store(self, curve=None):
493 curve = curve or ec.SECP256R1
494 private_key = ec.generate_private_key(curve(), default_backend())
495 self.pubkey = private_key.public_key()
496
497 @crypto_validator
498 def import_from_der(self, pubkey):
499 # No lib support for explicit curves nor compressed points.
500 self.pubkey = serialization.load_der_public_key(
501 pubkey,
502 backend=default_backend(),
503 )
504
505 def encrypt(self, msg, h="sha256", **kwargs):
506 raise Exception("No ECDSA encryption support")
507
508 @crypto_validator
509 def verify(self, msg, sig, h="sha256", **kwargs):
510 # 'sig' should be a DER-encoded signature, as per RFC 3279
511 try:
512 self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h)))
513 return True
514 except InvalidSignature:
515 return False
516
517
518class PubKeyEdDSA(PubKey):
519 """
520 Wrapper for EdDSA keys based on the cryptography library.
521 Use the 'key' attribute to access original object.
522 """
523
524 @crypto_validator
525 def fill_and_store(self, curve=None):
526 curve = curve or x25519.X25519PrivateKey
527 private_key = curve.generate()
528 self.pubkey = private_key.public_key()
529
530 @crypto_validator
531 def import_from_der(self, pubkey):
532 self.pubkey = serialization.load_der_public_key(
533 pubkey,
534 backend=default_backend(),
535 )
536
537 def encrypt(self, msg, **kwargs):
538 raise Exception("No EdDSA encryption support")
539
540 @crypto_validator
541 def verify(self, msg, sig, **kwargs):
542 # 'sig' should be a DER-encoded signature, as per RFC 3279
543 try:
544 self.pubkey.verify(sig, msg)
545 return True
546 except InvalidSignature:
547 return False
548
549
550################
551# Private Keys #
552################
553
554
555class _PrivKeyFactory(_PKIObjMaker):
556 """
557 Metaclass for PrivKey creation.
558 It casts the appropriate class on the fly, then fills in
559 the appropriate attributes with import_from_asn1pkt() submethod.
560 """
561
562 def __call__(cls, key_path=None, cryptography_obj=None):
563 """
564 key_path may be the path to either:
565 _an RSAPrivateKey_OpenSSL (as generated by openssl);
566 _an ECDSAPrivateKey_OpenSSL (as generated by openssl);
567 _an RSAPrivateKey;
568 _an ECDSAPrivateKey.
569 """
570 # This allows to import cryptography objects directly
571 if cryptography_obj is not None:
572 # We (stupidly) need to go through the whole import process because RSA
573 # does more than just importing the cryptography objects...
574 obj = _PKIObj(
575 "DER",
576 cryptography_obj.private_bytes(
577 encoding=serialization.Encoding.DER,
578 format=serialization.PrivateFormat.PKCS8,
579 encryption_algorithm=serialization.NoEncryption(),
580 ),
581 )
582 elif key_path is None:
583 obj = type.__call__(cls)
584 if cls is PrivKey:
585 cls = PrivKeyECDSA
586 obj.__class__ = cls
587 obj.frmt = "original"
588 obj.fill_and_store()
589 return obj
590 else:
591 # Load from file
592 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
593
594 try:
595 privkey = RSAPrivateKey_OpenSSL(obj._der)
596 privkey = privkey.privateKey
597 obj.__class__ = PrivKeyRSA
598 obj.marker = "PRIVATE KEY"
599 except Exception:
600 try:
601 privkey = ECDSAPrivateKey_OpenSSL(obj._der)
602 privkey = privkey.privateKey
603 obj.__class__ = PrivKeyECDSA
604 obj.marker = "EC PRIVATE KEY"
605 except Exception:
606 try:
607 privkey = RSAPrivateKey(obj._der)
608 obj.__class__ = PrivKeyRSA
609 obj.marker = "RSA PRIVATE KEY"
610 except Exception:
611 try:
612 privkey = ECDSAPrivateKey(obj._der)
613 obj.__class__ = PrivKeyECDSA
614 obj.marker = "EC PRIVATE KEY"
615 except Exception:
616 try:
617 privkey = EdDSAPrivateKey(obj._der)
618 obj.__class__ = PrivKeyEdDSA
619 obj.marker = "PRIVATE KEY"
620 except Exception:
621 raise Exception("Unable to import private key")
622 try:
623 obj.import_from_asn1pkt(privkey)
624 except ImportError:
625 pass
626 return obj
627
628
629class _Raw_ASN1_BIT_STRING(ASN1_BIT_STRING):
630 """A ASN1_BIT_STRING that ignores BER encoding"""
631
632 def __bytes__(self):
633 return self.val_readable
634
635 __str__ = __bytes__
636
637
638class PrivKey(metaclass=_PrivKeyFactory):
639 """
640 Parent class for PrivKeyRSA, PrivKeyECDSA and PrivKeyEdDSA.
641 Provides common signTBSCert(), resignCert(), verifyCert()
642 and export() methods.
643 """
644
645 def signTBSCert(self, tbsCert, h="sha256"):
646 """
647 Note that this will always copy the signature field from the
648 tbsCertificate into the signatureAlgorithm field of the result,
649 regardless of the coherence between its contents (which might
650 indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2).
651
652 There is a small inheritance trick for the computation of sigVal
653 below: in order to use a sign() method which would apply
654 to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the
655 subclasses accept any argument, be it from the RSA or ECDSA world,
656 and then they keep the ones they're interested in.
657 Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign().
658 """
659 sigAlg = tbsCert.signature
660 h = h or hash_by_oid[sigAlg.algorithm.val]
661 sigVal = self.sign(bytes(tbsCert), h=h, t="pkcs")
662 c = X509_Cert()
663 c.tbsCertificate = tbsCert
664 c.signatureAlgorithm = sigAlg
665 c.signatureValue = _Raw_ASN1_BIT_STRING(sigVal, readable=True)
666 return c
667
668 def resignCert(self, cert):
669 """Rewrite the signature of either a Cert or an X509_Cert."""
670 return self.signTBSCert(cert.tbsCertificate, h=None)
671
672 def verifyCert(self, cert):
673 """Verifies either a Cert or an X509_Cert."""
674 return self.pubkey.verifyCert(cert)
675
676 def verifyCsr(self, cert):
677 """Verifies either a CSR."""
678 return self.pubkey.verifyCsr(cert)
679
680 @property
681 def pem(self):
682 return der2pem(self.der, self.marker)
683
684 @property
685 def der(self):
686 return self.key.private_bytes(
687 encoding=serialization.Encoding.DER,
688 format=serialization.PrivateFormat.PKCS8,
689 encryption_algorithm=serialization.NoEncryption(),
690 )
691
692 def export(self, filename, fmt=None):
693 """
694 Export private key in 'fmt' format (DER or PEM) to file 'filename'
695 """
696 if fmt is None:
697 if filename.endswith(".pem"):
698 fmt = "PEM"
699 else:
700 fmt = "DER"
701 with open(filename, "wb") as f:
702 if fmt == "DER":
703 return f.write(self.der)
704 elif fmt == "PEM":
705 return f.write(self.pem.encode())
706
707 @crypto_validator
708 def sign(self, data, h="sha256", **kwargs):
709 """
710 Sign data.
711 """
712 raise NotImplementedError
713
714 @crypto_validator
715 def verify(self, msg, sig, h="sha256", **kwargs):
716 """
717 Verify signed data.
718 """
719 raise NotImplementedError
720
721
722class PrivKeyRSA(PrivKey, _DecryptAndSignRSA):
723 """
724 Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py
725 Use the 'key' attribute to access original object.
726 """
727
728 @crypto_validator
729 def fill_and_store(
730 self,
731 modulus=None,
732 modulusLen=None,
733 pubExp=None,
734 prime1=None,
735 prime2=None,
736 coefficient=None,
737 exponent1=None,
738 exponent2=None,
739 privExp=None,
740 ):
741 pubExp = pubExp or 65537
742 if None in [
743 modulus,
744 prime1,
745 prime2,
746 coefficient,
747 privExp,
748 exponent1,
749 exponent2,
750 ]:
751 # note that the library requires every parameter
752 # in order to call RSAPrivateNumbers(...)
753 # if one of these is missing, we generate a whole new key
754 real_modulusLen = modulusLen or 2048
755 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED:
756 # cryptography > 43.0 compatibility
757 self.key = rust_openssl.rsa.generate_private_key(
758 public_exponent=pubExp,
759 key_size=real_modulusLen,
760 )
761 else:
762 self.key = rsa.generate_private_key(
763 public_exponent=pubExp,
764 key_size=real_modulusLen,
765 backend=default_backend(),
766 )
767 pubkey = self.key.public_key()
768 else:
769 real_modulusLen = len(binrepr(modulus))
770 if modulusLen and real_modulusLen != modulusLen:
771 warning("modulus and modulusLen do not match!")
772 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
773 privNum = rsa.RSAPrivateNumbers(
774 p=prime1,
775 q=prime2,
776 dmp1=exponent1,
777 dmq1=exponent2,
778 iqmp=coefficient,
779 d=privExp,
780 public_numbers=pubNum,
781 )
782 self.key = privNum.private_key(default_backend())
783 pubkey = self.key.public_key()
784
785 self.marker = "PRIVATE KEY"
786
787 # Lines below are only useful for the legacy part of pkcs1.py
788 pubNum = pubkey.public_numbers()
789 self._modulusLen = real_modulusLen
790 self._modulus = pubNum.n
791 self._pubExp = pubNum.e
792
793 self.pubkey = PubKeyRSA((pubNum.e, pubNum.n, real_modulusLen))
794
795 def import_from_asn1pkt(self, privkey):
796 modulus = privkey.modulus.val
797 pubExp = privkey.publicExponent.val
798 privExp = privkey.privateExponent.val
799 prime1 = privkey.prime1.val
800 prime2 = privkey.prime2.val
801 exponent1 = privkey.exponent1.val
802 exponent2 = privkey.exponent2.val
803 coefficient = privkey.coefficient.val
804 self.fill_and_store(
805 modulus=modulus,
806 pubExp=pubExp,
807 privExp=privExp,
808 prime1=prime1,
809 prime2=prime2,
810 exponent1=exponent1,
811 exponent2=exponent2,
812 coefficient=coefficient,
813 )
814
815 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
816 return self.pubkey.verify(
817 msg=msg,
818 sig=sig,
819 t=t,
820 h=h,
821 mgf=mgf,
822 L=L,
823 )
824
825 def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None):
826 return _DecryptAndSignRSA.sign(self, data, t=t, h=h, mgf=mgf, L=L)
827
828
829class PrivKeyECDSA(PrivKey):
830 """
831 Wrapper for ECDSA keys based on SigningKey from ecdsa library.
832 Use the 'key' attribute to access original object.
833 """
834
835 @crypto_validator
836 def fill_and_store(self, curve=None):
837 curve = curve or ec.SECP256R1
838 self.key = ec.generate_private_key(curve(), default_backend())
839 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
840 self.marker = "EC PRIVATE KEY"
841
842 @crypto_validator
843 def import_from_asn1pkt(self, privkey):
844 self.key = serialization.load_der_private_key(
845 bytes(privkey), None, backend=default_backend()
846 )
847 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
848 self.marker = "EC PRIVATE KEY"
849
850 @crypto_validator
851 def verify(self, msg, sig, h="sha256", **kwargs):
852 return self.pubkey.verify(msg=msg, sig=sig, h=h, **kwargs)
853
854 @crypto_validator
855 def sign(self, data, h="sha256", **kwargs):
856 return self.key.sign(data, ec.ECDSA(_get_hash(h)))
857
858
859class PrivKeyEdDSA(PrivKey):
860 """
861 Wrapper for EdDSA keys
862 Use the 'key' attribute to access original object.
863 """
864
865 @crypto_validator
866 def fill_and_store(self, curve=None):
867 curve = curve or x25519.X25519PrivateKey
868 self.key = curve.generate()
869 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
870 self.marker = "PRIVATE KEY"
871
872 @crypto_validator
873 def import_from_asn1pkt(self, privkey):
874 self.key = serialization.load_der_private_key(
875 bytes(privkey), None, backend=default_backend()
876 )
877 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
878 self.marker = "PRIVATE KEY"
879
880 @crypto_validator
881 def verify(self, msg, sig, **kwargs):
882 return self.pubkey.verify(msg=msg, sig=sig, **kwargs)
883
884 @crypto_validator
885 def sign(self, data, **kwargs):
886 return self.key.sign(data)
887
888
889################
890# Certificates #
891################
892
893
894class _CertMaker(_PKIObjMaker):
895 """
896 Metaclass for Cert creation. It is not necessary as it was for the keys,
897 but we reuse the model instead of creating redundant constructors.
898 """
899
900 def __call__(cls, cert_path=None, cryptography_obj=None):
901 # This allows to import cryptography objects directly
902 if cryptography_obj is not None:
903 obj = _PKIObj(
904 "DER",
905 cryptography_obj.public_bytes(
906 encoding=serialization.Encoding.DER,
907 ),
908 )
909 else:
910 # Load from file
911 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CERT_SIZE, "CERTIFICATE")
912 obj.__class__ = Cert
913 obj.marker = "CERTIFICATE"
914 try:
915 cert = X509_Cert(obj._der)
916 except Exception:
917 if conf.debug_dissector:
918 raise
919 raise Exception("Unable to import certificate")
920 obj.import_from_asn1pkt(cert)
921 return obj
922
923
924def _get_cert_sig_hashname(cert):
925 """
926 Return the hash associated with the signature algorithm of a certificate.
927 """
928 tbsCert = cert.tbsCertificate
929 sigAlg = tbsCert.signature
930 return hash_by_oid[sigAlg.algorithm.val]
931
932
933def _get_csr_sig_hashname(csr):
934 """
935 Return the hash associated with the signature algorithm of a CSR.
936 """
937 certReq = csr.certReq
938 sigAlg = certReq.signatureAlgorithm
939 return hash_by_oid[sigAlg.algorithm.val]
940
941
942class Cert(metaclass=_CertMaker):
943 """
944 Wrapper for the X509_Cert from layers/x509.py.
945 Use the 'x509Cert' attribute to access original object.
946 """
947
948 def import_from_asn1pkt(self, cert):
949 error_msg = "Unable to import certificate"
950
951 self.x509Cert = cert
952
953 tbsCert = cert.tbsCertificate
954
955 if tbsCert.version:
956 self.version = tbsCert.version.val + 1
957 else:
958 self.version = 1
959 self.serial = tbsCert.serialNumber.val
960 self.sigAlg = tbsCert.signature.algorithm.oidname
961 self.issuer = tbsCert.get_issuer()
962 self.issuer_str = tbsCert.get_issuer_str()
963 self.issuer_hash = hash(self.issuer_str)
964 self.subject = tbsCert.get_subject()
965 self.subject_str = tbsCert.get_subject_str()
966 self.subject_hash = hash(self.subject_str)
967 self.authorityKeyID = None
968
969 self.notBefore_str = tbsCert.validity.not_before.pretty_time
970 try:
971 self.notBefore = tbsCert.validity.not_before.datetime.timetuple()
972 except ValueError:
973 raise Exception(error_msg)
974 self.notBefore_str_simple = time.strftime("%x", self.notBefore)
975
976 self.notAfter_str = tbsCert.validity.not_after.pretty_time
977 try:
978 self.notAfter = tbsCert.validity.not_after.datetime.timetuple()
979 except ValueError:
980 raise Exception(error_msg)
981 self.notAfter_str_simple = time.strftime("%x", self.notAfter)
982
983 self.pubkey = PubKey(bytes(tbsCert.subjectPublicKeyInfo))
984
985 if tbsCert.extensions:
986 for extn in tbsCert.extensions:
987 if extn.extnID.oidname == "basicConstraints":
988 self.cA = False
989 if extn.extnValue.cA:
990 self.cA = not (extn.extnValue.cA.val == 0)
991 elif extn.extnID.oidname == "keyUsage":
992 self.keyUsage = extn.extnValue.get_keyUsage()
993 elif extn.extnID.oidname == "extKeyUsage":
994 self.extKeyUsage = extn.extnValue.get_extendedKeyUsage()
995 elif extn.extnID.oidname == "authorityKeyIdentifier":
996 self.authorityKeyID = extn.extnValue.keyIdentifier.val
997
998 self.signatureValue = bytes(cert.signatureValue)
999 self.signatureLen = len(self.signatureValue)
1000
1001 def isIssuer(self, other):
1002 """
1003 True if 'other' issued 'self', i.e.:
1004 - self.issuer == other.subject
1005 - self is signed by other
1006 """
1007 if self.issuer_hash != other.subject_hash:
1008 return False
1009 return other.pubkey.verifyCert(self)
1010
1011 def isIssuerCert(self, other):
1012 return self.isIssuer(other)
1013
1014 def isSelfSigned(self):
1015 """
1016 Return True if the certificate is self-signed:
1017 - issuer and subject are the same
1018 - the signature of the certificate is valid.
1019 """
1020 if self.issuer_hash == self.subject_hash:
1021 return self.isIssuer(self)
1022 return False
1023
1024 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
1025 # no ECDSA *encryption* support, hence only RSA specific keywords here
1026 return self.pubkey.encrypt(msg, t=t, h=h, mgf=mgf, L=L)
1027
1028 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
1029 return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L)
1030
1031 def getCertSignatureHash(self):
1032 """
1033 Return the hash cryptography object used by the 'signatureAlgorithm'
1034 """
1035 return _get_hash(_get_cert_sig_hashname(self))
1036
1037 def setSubjectPublicKeyFromPrivateKey(self, key):
1038 """
1039 Replace the subjectPublicKeyInfo of this certificate with the one from
1040 the provided key.
1041 """
1042 if isinstance(key, (PubKey, PrivKey)):
1043 if isinstance(key, PrivKey):
1044 pubkey = key.pubkey
1045 else:
1046 pubkey = key
1047 self.tbsCertificate.subjectPublicKeyInfo = X509_SubjectPublicKeyInfo(
1048 pubkey.der
1049 )
1050 else:
1051 raise ValueError("Unknown type 'key', should be PubKey or PrivKey")
1052
1053 def resignWith(self, key):
1054 """
1055 Resign a certificate with a specific key
1056 """
1057 self.import_from_asn1pkt(key.resignCert(self))
1058
1059 def remainingDays(self, now=None):
1060 """
1061 Based on the value of notAfter field, returns the number of
1062 days the certificate will still be valid. The date used for the
1063 comparison is the current and local date, as returned by
1064 time.localtime(), except if 'now' argument is provided another
1065 one. 'now' argument can be given as either a time tuple or a string
1066 representing the date. Accepted format for the string version
1067 are:
1068
1069 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT'
1070 - '%m/%d/%y' e.g. '01/30/08' (less precise)
1071
1072 If the certificate is no more valid at the date considered, then
1073 a negative value is returned representing the number of days
1074 since it has expired.
1075
1076 The number of days is returned as a float to deal with the unlikely
1077 case of certificates that are still just valid.
1078 """
1079 if now is None:
1080 now = time.localtime()
1081 elif isinstance(now, str):
1082 try:
1083 if "/" in now:
1084 now = time.strptime(now, "%m/%d/%y")
1085 else:
1086 now = time.strptime(now, "%b %d %H:%M:%S %Y %Z")
1087 except Exception:
1088 warning("Bad time string provided, will use localtime() instead.")
1089 now = time.localtime()
1090
1091 now = time.mktime(now)
1092 nft = time.mktime(self.notAfter)
1093 diff = (nft - now) / (24.0 * 3600)
1094 return diff
1095
1096 def isRevoked(self, crl_list):
1097 """
1098 Given a list of trusted CRL (their signature has already been
1099 verified with trusted anchors), this function returns True if
1100 the certificate is marked as revoked by one of those CRL.
1101
1102 Note that if the Certificate was on hold in a previous CRL and
1103 is now valid again in a new CRL and bot are in the list, it
1104 will be considered revoked: this is because _all_ CRLs are
1105 checked (not only the freshest) and revocation status is not
1106 handled.
1107
1108 Also note that the check on the issuer is performed on the
1109 Authority Key Identifier if available in _both_ the CRL and the
1110 Cert. Otherwise, the issuers are simply compared.
1111 """
1112 for c in crl_list:
1113 if (
1114 self.authorityKeyID is not None
1115 and c.authorityKeyID is not None
1116 and self.authorityKeyID == c.authorityKeyID
1117 ):
1118 return self.serial in (x[0] for x in c.revoked_cert_serials)
1119 elif self.issuer == c.issuer:
1120 return self.serial in (x[0] for x in c.revoked_cert_serials)
1121 return False
1122
1123 @property
1124 def tbsCertificate(self):
1125 return self.x509Cert.tbsCertificate
1126
1127 @property
1128 def pem(self):
1129 return der2pem(self.der, self.marker)
1130
1131 @property
1132 def der(self):
1133 return bytes(self.x509Cert)
1134
1135 @property
1136 def pubKey(self):
1137 warnings.warn(
1138 "Cert.pubKey is deprecated and will be removed in a future version. "
1139 "Use Cert.pubkey",
1140 DeprecationWarning,
1141 )
1142 return self.pubkey
1143
1144 @property
1145 def extensions(self):
1146 return self.tbsCertificate.extensions
1147
1148 def __eq__(self, other):
1149 return self.der == other.der
1150
1151 def __hash__(self):
1152 return hash(self.der)
1153
1154 def export(self, filename, fmt=None):
1155 """
1156 Export certificate in 'fmt' format (DER or PEM) to file 'filename'
1157 """
1158 if fmt is None:
1159 if filename.endswith(".pem"):
1160 fmt = "PEM"
1161 else:
1162 fmt = "DER"
1163 with open(filename, "wb") as f:
1164 if fmt == "DER":
1165 return f.write(self.der)
1166 elif fmt == "PEM":
1167 return f.write(self.pem.encode())
1168
1169 def show(self):
1170 print("Serial: %s" % self.serial)
1171 print("Issuer: " + self.issuer_str)
1172 print("Subject: " + self.subject_str)
1173 print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str))
1174
1175 def __repr__(self):
1176 return "[X.509 Cert. Subject:%s, Issuer:%s]" % (
1177 self.subject_str,
1178 self.issuer_str,
1179 )
1180
1181
1182################################
1183# Certificate Revocation Lists #
1184################################
1185
1186
1187class _CRLMaker(_PKIObjMaker):
1188 """
1189 Metaclass for CRL creation. It is not necessary as it was for the keys,
1190 but we reuse the model instead of creating redundant constructors.
1191 """
1192
1193 def __call__(cls, cert_path):
1194 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL")
1195 obj.__class__ = CRL
1196 try:
1197 crl = X509_CRL(obj._der)
1198 except Exception:
1199 raise Exception("Unable to import CRL")
1200 obj.import_from_asn1pkt(crl)
1201 return obj
1202
1203
1204class CRL(metaclass=_CRLMaker):
1205 """
1206 Wrapper for the X509_CRL from layers/x509.py.
1207 Use the 'x509CRL' attribute to access original object.
1208 """
1209
1210 def import_from_asn1pkt(self, crl):
1211 error_msg = "Unable to import CRL"
1212
1213 self.x509CRL = crl
1214
1215 tbsCertList = crl.tbsCertList
1216 self.tbsCertList = bytes(tbsCertList)
1217
1218 if tbsCertList.version:
1219 self.version = tbsCertList.version.val + 1
1220 else:
1221 self.version = 1
1222 self.sigAlg = tbsCertList.signature.algorithm.oidname
1223 self.issuer = tbsCertList.get_issuer()
1224 self.issuer_str = tbsCertList.get_issuer_str()
1225 self.issuer_hash = hash(self.issuer_str)
1226
1227 self.lastUpdate_str = tbsCertList.this_update.pretty_time
1228 lastUpdate = tbsCertList.this_update.val
1229 if lastUpdate[-1] == "Z":
1230 lastUpdate = lastUpdate[:-1]
1231 try:
1232 self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S")
1233 except Exception:
1234 raise Exception(error_msg)
1235 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate)
1236
1237 self.nextUpdate = None
1238 self.nextUpdate_str_simple = None
1239 if tbsCertList.next_update:
1240 self.nextUpdate_str = tbsCertList.next_update.pretty_time
1241 nextUpdate = tbsCertList.next_update.val
1242 if nextUpdate[-1] == "Z":
1243 nextUpdate = nextUpdate[:-1]
1244 try:
1245 self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S")
1246 except Exception:
1247 raise Exception(error_msg)
1248 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate)
1249
1250 if tbsCertList.crlExtensions:
1251 for extension in tbsCertList.crlExtensions:
1252 if extension.extnID.oidname == "cRLNumber":
1253 self.number = extension.extnValue.cRLNumber.val
1254
1255 revoked = []
1256 if tbsCertList.revokedCertificates:
1257 for cert in tbsCertList.revokedCertificates:
1258 serial = cert.serialNumber.val
1259 date = cert.revocationDate.val
1260 if date[-1] == "Z":
1261 date = date[:-1]
1262 try:
1263 time.strptime(date, "%y%m%d%H%M%S")
1264 except Exception:
1265 raise Exception(error_msg)
1266 revoked.append((serial, date))
1267 self.revoked_cert_serials = revoked
1268
1269 self.signatureValue = bytes(crl.signatureValue)
1270 self.signatureLen = len(self.signatureValue)
1271
1272 def isIssuer(self, other):
1273 # This is exactly the same thing as in Cert method.
1274 if self.issuer_hash != other.subject_hash:
1275 return False
1276 return other.pubkey.verifyCert(self)
1277
1278 def verify(self, anchors):
1279 # Return True iff the CRL is signed by one of the provided anchors.
1280 return any(self.isIssuer(a) for a in anchors)
1281
1282 def show(self):
1283 print("Version: %d" % self.version)
1284 print("sigAlg: " + self.sigAlg)
1285 print("Issuer: " + self.issuer_str)
1286 print("lastUpdate: %s" % self.lastUpdate_str)
1287 print("nextUpdate: %s" % self.nextUpdate_str)
1288
1289
1290###############################
1291# Certificate Signing Request #
1292###############################
1293
1294
1295class _CSRMaker(_PKIObjMaker):
1296 """
1297 Metaclass for CSR creation. It is not necessary as it was for the keys,
1298 but we reuse the model instead of creating redundant constructors.
1299 """
1300
1301 def __call__(cls, cert_path):
1302 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CSR_SIZE)
1303 obj.__class__ = CSR
1304 try:
1305 # PKCS#10 format
1306 csr = PKCS10_CertificationRequest(obj._der)
1307 obj.marker = "NEW CERTIFICATE REQUEST"
1308 obj.fmt = CSR.FORMAT.PKCS10
1309 except Exception:
1310 try:
1311 # CMC format
1312 csr = CMS_ContentInfo(obj._der)
1313 obj.marker = "NEW CERTIFICATE REQUEST"
1314 obj.fmt = CSR.FORMAT.CMC
1315 except Exception:
1316 raise Exception("Unable to import CSR")
1317
1318 obj.import_from_asn1pkt(csr)
1319 return obj
1320
1321
1322class CSR(metaclass=_CSRMaker):
1323 """
1324 Wrapper for the CSR formats.
1325 This can handle both PKCS#10 and CMC formats.
1326 """
1327
1328 class FORMAT(enum.Enum):
1329 """
1330 The format used by the CSR.
1331 """
1332
1333 PKCS10 = "PKCS#10"
1334 CMC = "CMC"
1335
1336 def import_from_asn1pkt(self, csr):
1337 self.csr = csr
1338 certReqInfo = self.certReq.certificationRequestInfo
1339
1340 # Subject
1341 self.subject = certReqInfo.get_subject()
1342 self.subject_str = certReqInfo.get_subject_str()
1343 self.subject_hash = hash(self.subject_str)
1344
1345 # pubkey
1346 self.pubkey = PubKey(bytes(certReqInfo.subjectPublicKeyInfo))
1347
1348 # Get the "subjectKeyIdentifier" from the "extensionRequest" attribute
1349 try:
1350 extReq = next(
1351 x.values[0].value
1352 for x in certReqInfo.attributes
1353 if x.type.val == "1.2.840.113549.1.9.14" # extKeyUsage
1354 )
1355 self.sid = next(
1356 x.extnValue.keyIdentifier
1357 for x in extReq.extensions
1358 if x.extnID.val == "2.5.29.14" # subjectKeyIdentifier
1359 )
1360 except StopIteration:
1361 self.sid = None
1362
1363 @property
1364 def certReq(self):
1365 csr = self.csr
1366
1367 if self.fmt == CSR.FORMAT.PKCS10:
1368 return csr
1369 elif self.fmt == CSR.FORMAT.CMC:
1370 if (
1371 csr.contentType.oidname != "id-signedData"
1372 or csr.content.encapContentInfo.eContentType.oidname != "id-cct-PKIData"
1373 ):
1374 raise ValueError("Invalid CMC wrapping !")
1375 req = csr.content.encapContentInfo.eContent.reqSequence[0]
1376 return req.request.certificationRequest
1377 else:
1378 raise ValueError("Invalid CSR format !")
1379
1380 @property
1381 def pem(self):
1382 return der2pem(self.der, self.marker)
1383
1384 @property
1385 def der(self):
1386 return bytes(self.csr)
1387
1388 def __eq__(self, other):
1389 return self.der == other.der
1390
1391 def __hash__(self):
1392 return hash(self.der)
1393
1394 def isIssuer(self, other):
1395 return other.sid == self.sid
1396
1397 def isSelfSigned(self):
1398 return True
1399
1400 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
1401 return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L)
1402
1403 def export(self, filename, fmt=None):
1404 """
1405 Export certificate in 'fmt' format (DER or PEM) to file 'filename'
1406 """
1407 if fmt is None:
1408 if filename.endswith(".pem"):
1409 fmt = "PEM"
1410 else:
1411 fmt = "DER"
1412 with open(filename, "wb") as f:
1413 if fmt == "DER":
1414 return f.write(self.der)
1415 elif fmt == "PEM":
1416 return f.write(self.pem.encode())
1417
1418 def show(self):
1419 certReqInfo = self.certReq.certificationRequestInfo
1420
1421 print("Subject: " + self.subject_str)
1422 print("Attributes:")
1423 for attr in certReqInfo.attributes:
1424 print(" - %s" % attr.type.oidname)
1425
1426 def verifySelf(self) -> bool:
1427 """
1428 Verify the signatures of the CSR
1429 """
1430 if self.fmt == self.FORMAT.CMC:
1431 try:
1432 cms_engine = CMS_Engine([self])
1433 cms_engine.verify(self.csr)
1434 return self.pubkey.verifyCsr(self)
1435 except ValueError:
1436 return False
1437 elif self.fmt == self.FORMAT.PKCS10:
1438 return self.pubkey.verifyCsr(self)
1439 else:
1440 return False
1441
1442 def __repr__(self):
1443 return "[CSR Format: %s, Subject:%s, Verified: %s]" % (
1444 self.fmt.value,
1445 self.subject_str,
1446 self.verifySelf(),
1447 )
1448
1449
1450####################
1451# Certificate list #
1452####################
1453
1454
1455class CertList(list):
1456 """
1457 An object that can store a list of Cert objects, load them and export them
1458 into DER/PEM format.
1459 """
1460
1461 def __init__(
1462 self,
1463 certList: Union[Self, List[Cert], List[CSR], Cert, str],
1464 ):
1465 """
1466 Construct a list of certificates/CRLs to be used as list of ROOT certificates.
1467 """
1468 # Parse the certificate list / CA
1469 if isinstance(certList, str):
1470 # It's a path. First get the _PKIObj
1471 obj = _PKIObjMaker.__call__(
1472 CertList, certList, _MAX_CERT_SIZE, "CERTIFICATE"
1473 )
1474
1475 # Then parse the der until there's nothing left
1476 certList = []
1477 payload = obj._der
1478 while payload:
1479 cert = X509_Cert(payload)
1480 if conf.raw_layer in cert.payload:
1481 payload = cert.payload.load
1482 else:
1483 payload = None
1484 cert.remove_payload()
1485 certList.append(Cert(cert))
1486
1487 self.frmt = obj.frmt
1488 elif isinstance(certList, Cert):
1489 certList = [certList]
1490 self.frmt = "PEM"
1491 else:
1492 self.frmt = "PEM"
1493
1494 super(CertList, self).__init__(certList)
1495
1496 def findCertBySid(self, sid):
1497 """
1498 Find a certificate in the list by SubjectIDentifier.
1499 """
1500 for cert in self:
1501 if isinstance(cert, Cert) and isinstance(sid, CMS_IssuerAndSerialNumber):
1502 if cert.issuer == sid.get_issuer():
1503 return cert
1504 elif isinstance(cert, CSR) and isinstance(sid, CMS_SubjectKeyIdentifier):
1505 if cert.sid == sid.sid:
1506 return cert
1507 raise KeyError("Certificate not found !")
1508
1509 def export(self, filename, fmt=None):
1510 """
1511 Export a list of certificates 'fmt' format (DER or PEM) to file 'filename'
1512 """
1513 if fmt is None:
1514 if filename.endswith(".pem"):
1515 fmt = "PEM"
1516 else:
1517 fmt = "DER"
1518 with open(filename, "wb") as f:
1519 if fmt == "DER":
1520 return f.write(self.der)
1521 elif fmt == "PEM":
1522 return f.write(self.pem.encode())
1523
1524 @property
1525 def der(self):
1526 return b"".join(x.der for x in self)
1527
1528 @property
1529 def pem(self):
1530 return "".join(x.pem for x in self)
1531
1532 def __repr__(self):
1533 return "<CertList %s certificates>" % (len(self),)
1534
1535 def show(self):
1536 for i, c in enumerate(self):
1537 print(conf.color_theme.id(i, fmt="%04i"), end=" ")
1538 print(repr(c))
1539
1540
1541######################
1542# Certificate chains #
1543######################
1544
1545
1546class CertTree(CertList):
1547 """
1548 An extension to CertList that additionally has a list of ROOT CAs
1549 that are trusted.
1550
1551 Example::
1552
1553 >>> tree = CertTree("ca_chain.pem")
1554 >>> tree.show()
1555 /CN=DOMAIN-DC1-CA/dc=DOMAIN [Self Signed]
1556 /CN=Administrator/dc=DOMAIN [Not Self Signed]
1557 """
1558
1559 __slots__ = ["frmt", "rootCAs"]
1560
1561 def __init__(
1562 self,
1563 certList: Union[List[Cert], CertList, str],
1564 rootCAs: Union[List[Cert], CertList, Cert, str, None] = None,
1565 ):
1566 """
1567 Construct a chain of certificates that follows issuer/subject matching and
1568 respects signature validity.
1569
1570 Note that we do not check AKID/{SKID/issuer/serial} matching,
1571 nor the presence of keyCertSign in keyUsage extension (if present).
1572
1573 :param certList: a list of Cert/CRL objects (or path to PEM/DER file containing
1574 multiple certs/CRL) to try to chain.
1575 :param rootCAs: (optional) a list of certificates to trust. If not provided,
1576 trusts any self-signed certificates from the certList.
1577 """
1578 # Parse the certificate list
1579 certList = CertList(certList)
1580
1581 # Find the ROOT CAs if store isn't specified
1582 if not rootCAs:
1583 # Build cert store.
1584 self.rootCAs = CertList([x for x in certList if x.isSelfSigned()])
1585 # And remove those certs from the list
1586 for cert in self.rootCAs:
1587 certList.remove(cert)
1588 else:
1589 # Store cert store.
1590 self.rootCAs = CertList(rootCAs)
1591 # And remove those certs from the list if present (remove dups)
1592 for cert in self.rootCAs:
1593 if cert in certList:
1594 certList.remove(cert)
1595
1596 # Append our root CAs to the certList
1597 certList.extend(self.rootCAs)
1598
1599 # Super instantiate
1600 super(CertTree, self).__init__(certList)
1601
1602 @property
1603 def tree(self):
1604 """
1605 Get a tree-like object of the certificate list
1606 """
1607 # We store the tree object as a dictionary that contains children.
1608 tree = [(x, []) for x in self.rootCAs]
1609
1610 # We'll empty this list eventually
1611 certList = list(self)
1612
1613 # We make a list of certificates we have to search children for, and iterate
1614 # through it until it's empty.
1615 todo = list(tree)
1616
1617 # Iterate
1618 while todo:
1619 cert, children = todo.pop()
1620 for c in certList:
1621 # Check if this certificate matches the one we're looking at
1622 if c.isIssuer(cert) and c != cert:
1623 item = (c, [])
1624 children.append(item)
1625 certList.remove(c)
1626 todo.append(item)
1627
1628 return tree
1629
1630 def getchain(self, cert):
1631 """
1632 Return a chain of certificate that points from a ROOT CA to a certificate.
1633 """
1634
1635 def _rec_getchain(chain, curtree):
1636 # See if an element of the current tree signs the cert, if so add it to
1637 # the chain, else recurse.
1638 for c, subtree in curtree:
1639 curchain = chain + [c]
1640 # If 'cert' is issued by c
1641 if cert.isIssuer(c) or c == cert:
1642 # Final node of the chain !
1643 # (add the final cert if not self signed)
1644 if c != cert:
1645 curchain += [cert]
1646 return curchain
1647 else:
1648 # Not the final node of the chain ! Recurse.
1649 curchain = _rec_getchain(curchain, subtree)
1650 if curchain:
1651 return curchain
1652 return None
1653
1654 chain = _rec_getchain([], self.tree)
1655 if chain is not None:
1656 # We add the first certificate to the ROOT in all cases
1657 return CertTree(chain, [chain[0]])
1658 else:
1659 return None
1660
1661 def verify(self, cert):
1662 """
1663 Verify that a certificate is properly signed.
1664 """
1665 # Check that we can find a chain to this certificate
1666 if not self.getchain(cert):
1667 raise ValueError("Certificate verification failed !")
1668
1669 def show(self, ret: bool = False):
1670 """
1671 Return the CertTree as a string certificate tree
1672 """
1673
1674 def _rec_show(c, children, lvl=0):
1675 s = ""
1676 # Process the current CA
1677 if c:
1678 if not c.isSelfSigned():
1679 s += "%s [Not Self Signed]\n" % c.subject_str
1680 else:
1681 s += "%s [Self Signed]\n" % c.subject_str
1682 s = lvl * " " + s
1683 lvl += 1
1684 # Process all sub-CAs at a lower level
1685 for child, subchildren in children:
1686 s += _rec_show(child, subchildren, lvl=lvl)
1687 return s
1688
1689 showed = _rec_show(None, self.tree)
1690 if ret:
1691 return showed
1692 else:
1693 print(showed)
1694
1695 def __repr__(self):
1696 return "<CertTree %s certificates (%s ROOT CA)>" % (
1697 len(self),
1698 len(self.rootCAs),
1699 )
1700
1701
1702#######
1703# CMS #
1704#######
1705
1706# RFC3852
1707
1708
1709class CMS_Engine:
1710 """
1711 A utility class to perform CMS/PKCS7 operations, as specified by RFC3852.
1712
1713 :param store: a ROOT CA certificate list to trust.
1714 :param crls: a list of CRLs to include. This is currently not checked.
1715 """
1716
1717 def __init__(
1718 self,
1719 store: CertList,
1720 crls: List[X509_CRL] = [],
1721 ):
1722 self.store = store
1723 self.crls = crls
1724
1725 def _get_algorithms(self, key: PrivKey, h="sha256") -> ASN1_OID:
1726 """
1727 Get the algorithms matching a private key
1728 """
1729 if isinstance(key, PrivKeyRSA):
1730 # RFC3370 sect 3.2
1731 return (
1732 ASN1_OID("rsaEncryption"),
1733 _get_hash(h),
1734 h,
1735 )
1736 elif isinstance(key, PrivKeyECDSA):
1737 # RFC5753 sect 2.1.1
1738 if h == "sha1":
1739 return (
1740 ASN1_OID("ecdsa-with-SHA1"),
1741 hashes.SHA1(),
1742 "sha1",
1743 )
1744 elif h == "sha224":
1745 return (
1746 ASN1_OID("ecdsa-with-SHA224"),
1747 hashes.SHA224(),
1748 "sha224",
1749 )
1750 elif h == "sha256":
1751 return (
1752 ASN1_OID("ecdsa-with-SHA256"),
1753 hashes.SHA256(),
1754 "sha256",
1755 )
1756 elif h == "sha384":
1757 return (
1758 ASN1_OID("ecdsa-with-SHA384"),
1759 hashes.SHA384(),
1760 "sha384",
1761 )
1762 elif h == "sha512":
1763 return (
1764 ASN1_OID("ecdsa-with-SHA512"),
1765 hashes.SHA512(),
1766 "sha512",
1767 )
1768 else:
1769 raise ValueError("Unknown hash for private key !")
1770 elif isinstance(key, PrivKeyEdDSA):
1771 # RFC8419 sect 2.3
1772 if isinstance(key.key, x25519.X25519PrivateKey):
1773 return (
1774 ASN1_OID("Ed25519"),
1775 hashes.SHA512(),
1776 "sha512",
1777 )
1778 elif isinstance(key.key, x448.X448PrivateKey):
1779 return (
1780 ASN1_OID("Ed448"),
1781 hashes.SHAKE256(64),
1782 "shake256",
1783 )
1784 else:
1785 raise ValueError("Unknown private key type !")
1786
1787 def sign(
1788 self,
1789 message: Union[bytes, Packet],
1790 eContentType: ASN1_OID,
1791 cert: Cert,
1792 key: PrivKey,
1793 dhash: Optional[str] = "sha256",
1794 ):
1795 """
1796 Sign a message using CMS.
1797
1798 :param message: the inner content to sign.
1799 :param eContentType: the OID of the inner content.
1800 :param cert: the certificate whose key to use use for signing.
1801 :param key: the private key to use for signing.
1802 :param dhash: the hash to use for message digest (ECDSA only).
1803
1804 We currently only support X.509 certificates !
1805 """
1806 sigalg, cdhash, dhash = self._get_algorithms(key, h=dhash)
1807
1808 # RFC3852 5.4. Message Digest Calculation Process
1809 hash = hashes.Hash(cdhash)
1810 hash.update(bytes(message))
1811 hashed_message = hash.finalize()
1812
1813 # RFC3852 5.5. Signature Generation Process
1814 signerInfo = CMS_SignerInfo(
1815 version=1,
1816 sid=CMS_IssuerAndSerialNumber(
1817 issuer=cert.tbsCertificate.issuer,
1818 serialNumber=cert.tbsCertificate.serialNumber,
1819 ),
1820 digestAlgorithm=X509_AlgorithmIdentifier(
1821 algorithm=ASN1_OID(dhash),
1822 parameters=ASN1_NULL(0),
1823 ),
1824 signedAttrs=[
1825 X509_Attribute(
1826 type=ASN1_OID("contentType"),
1827 values=[
1828 X509_AttributeValue(value=eContentType),
1829 ],
1830 ),
1831 X509_Attribute(
1832 type=ASN1_OID("messageDigest"),
1833 # "A message-digest attribute MUST have a single attribute value"
1834 values=[
1835 X509_AttributeValue(value=ASN1_STRING(hashed_message)),
1836 ],
1837 ),
1838 ],
1839 signatureAlgorithm=X509_AlgorithmIdentifier(
1840 algorithm=sigalg,
1841 parameters=ASN1_NULL(0),
1842 ),
1843 )
1844 signerInfo.signature = ASN1_STRING(
1845 key.sign(
1846 bytes(
1847 CMS_SignedAttrsForSignature(
1848 signedAttrs=signerInfo.signedAttrs,
1849 )
1850 ),
1851 h=dhash,
1852 )
1853 )
1854
1855 # Build a chain of X509_Cert to ship (but skip the ROOT certificate)
1856 certTree = CertTree(cert, self.store)
1857 certificates = [x.x509Cert for x in certTree if not x.isSelfSigned()]
1858
1859 # Build final structure
1860 return CMS_ContentInfo(
1861 contentType=ASN1_OID("id-signedData"),
1862 content=CMS_SignedData(
1863 version=3 if certificates else 1,
1864 digestAlgorithms=X509_AlgorithmIdentifier(
1865 algorithm=ASN1_OID(dhash),
1866 parameters=ASN1_NULL(0),
1867 ),
1868 encapContentInfo=CMS_EncapsulatedContentInfo(
1869 eContentType=eContentType,
1870 eContent=message,
1871 ),
1872 certificates=(
1873 [CMS_CertificateChoices(certificate=cert) for cert in certificates]
1874 if certificates
1875 else None
1876 ),
1877 crls=(
1878 [CMS_RevocationInfoChoice(crl=crl) for crl in self.crls]
1879 if self.crls
1880 else None
1881 ),
1882 signerInfos=[
1883 signerInfo,
1884 ],
1885 ),
1886 )
1887
1888 def verify(
1889 self,
1890 contentInfo: CMS_ContentInfo,
1891 eContentType: Optional[ASN1_OID] = None,
1892 eContent: Optional[bytes] = None,
1893 no_verify_cert: bool = False,
1894 ):
1895 """
1896 Verify a CMS message against the list of trusted certificates,
1897 and return the unpacked message if the verification succeeds.
1898
1899 :param contentInfo: the ContentInfo whose signature to verify
1900 :param eContentType: if provided, verifies that the content type is valid
1901 :param eContent: in PKCS 7.1, provide the content to verify
1902 :param no_verify_cert: do not check the remote certificate (unsafe)
1903 """
1904 if contentInfo.contentType.oidname != "id-signedData":
1905 raise ValueError("ContentInfo isn't signed !")
1906
1907 signeddata = contentInfo.content
1908
1909 # Build the certificate chain
1910 certificates = []
1911 if signeddata.certificates:
1912 certificates = [Cert(x.certificate) for x in signeddata.certificates]
1913 certTree = CertTree(certificates, self.store)
1914
1915 # Check there's at least one signature
1916 if not signeddata.signerInfos:
1917 raise ValueError("ContentInfo contained no signature !")
1918
1919 # Check all signatures
1920 for signerInfo in signeddata.signerInfos:
1921 sigh = hash_by_oid[signerInfo.signatureAlgorithm.algorithm.val]
1922
1923 # Find certificate in the chain that did this
1924 cert: Cert = certTree.findCertBySid(signerInfo.sid)
1925
1926 # Verify certificate signature
1927 if not no_verify_cert:
1928 certTree.verify(cert)
1929
1930 # Verify the message hash
1931 if signerInfo.signedAttrs:
1932 # Verify the contentType
1933 try:
1934 contentType = next(
1935 x.values[0].value
1936 for x in signerInfo.signedAttrs
1937 if x.type.oidname == "contentType"
1938 )
1939
1940 if contentType != signeddata.encapContentInfo.eContentType:
1941 raise ValueError(
1942 "Inconsistent 'contentType' was detected in packet !"
1943 )
1944
1945 if eContentType is not None and eContentType != contentType:
1946 raise ValueError(
1947 "Expected '%s' but got '%s' contentType !"
1948 % (
1949 eContentType,
1950 contentType,
1951 )
1952 )
1953 except StopIteration:
1954 raise ValueError("Missing contentType in signedAttrs !")
1955
1956 # Verify the messageDigest value
1957 try:
1958 # "A message-digest attribute MUST have a single attribute value"
1959 messageDigest = next(
1960 x.values[0].value
1961 for x in signerInfo.signedAttrs
1962 if x.type.oidname == "messageDigest"
1963 )
1964
1965 if signeddata.encapContentInfo.eContent is not None:
1966 eContent = bytes(signeddata.encapContentInfo.eContent)
1967 elif eContent is None:
1968 raise ValueError("No eContent was provided !")
1969
1970 # Re-calculate hash
1971 h = signerInfo.digestAlgorithm.algorithm.oidname
1972 hash = hashes.Hash(_get_hash(h))
1973 hash.update(eContent)
1974 hashed_message = hash.finalize()
1975
1976 if hashed_message != messageDigest:
1977 raise ValueError("Invalid messageDigest value !")
1978 except StopIteration:
1979 raise ValueError("Missing messageDigest in signedAttrs !")
1980
1981 # Verify the signature
1982 cert.verify(
1983 msg=bytes(
1984 CMS_SignedAttrsForSignature(
1985 signedAttrs=signerInfo.signedAttrs,
1986 )
1987 ),
1988 sig=signerInfo.signature.val,
1989 h=sigh,
1990 )
1991 else:
1992 cert.verify(
1993 msg=bytes(signeddata.encapContentInfo),
1994 sig=signerInfo.signature.val,
1995 h=sigh,
1996 )
1997
1998 # Return the content
1999 return signeddata.encapContentInfo.eContent