1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net>
5# <arno@natisbad.org>
6# 2015, 2016, 2017 Maxence Tury <maxence.tury@ssi.gouv.fr>
7# 2022-2025 Gabriel Potter
8
9r"""
10High-level methods for PKI objects (X.509 certificates, CRLs, CSR, Keys, CMS).
11Supported keys include RSA, ECDSA and EdDSA.
12
13The classes below are wrappers for the ASN.1 objects defined in x509.py.
14
15Example 1: Certificate & Private key
16____________________________________
17
18For instance, here is what you could do in order to modify the subject public
19key info of a 'cert' and then resign it with whatever 'key'::
20
21 >>> from scapy.layers.tls.cert import *
22 >>> cert = Cert("cert.der")
23 >>> k = PrivKeyRSA() # generate a private key
24 >>> cert.setSubjectPublicKeyFromPrivateKey(k)
25 >>> cert.resignWith(k)
26 >>> cert.export("newcert.pem")
27 >>> k.export("mykey.pem")
28
29One could also edit arguments like the serial number, as such::
30
31 >>> from scapy.layers.tls.cert import *
32 >>> c = Cert("mycert.pem")
33 >>> c.tbsCertificate.serialNumber = 0x4B1D
34 >>> k = PrivKey("mykey.pem") # import an existing private key
35 >>> c.resignWith(k)
36 >>> c.export("newcert.pem")
37
38To export the public key of a private key::
39
40 >>> k = PrivKey("mykey.pem")
41 >>> k.pubkey.export("mypubkey.pem")
42
43Example 2: CertList and CertTree
44________________________________
45
46Load a .pem file that contains multiple certificates::
47
48 >>> l = CertList("ca_chain.pem")
49 >>> l.show()
50 0000 [X.509 Cert Subject:/C=FR/OU=Scapy Test PKI/CN=Scapy Test CA...]
51 0001 [X.509 Cert Subject:/C=FR/OU=Scapy Test PKI/CN=Scapy Test Client...]
52
53Use 'CertTree' to organize the certificates in a tree::
54
55 >>> tree = CertTree("ca_chain.pem") # or tree = CertTree(l)
56 >>> tree.show()
57 /C=Ulaanbaatar/OU=Scapy Test PKI/CN=Scapy Test CA [Self Signed]
58 /C=FR/OU=Scapy Test PKI/CN=Scapy Test Client [Not Self Signed]
59
60Example 3: Certificate Signing Request (CSR)
61____________________________________________
62
63Scapy's :py:class:`~scapy.layers.tls.cert.CSR` class supports both PKCS#10 and CMC
64formats.
65
66Load and display a CSR::
67
68 >>> csr = CSR("cert.req")
69 >>> csr
70 [CSR Format: CMC, Subject:/O=TestOrg/CN=TestCN, Verified: True]
71 >>> csr.certReq.show()
72 ###[ PKCS10_CertificationRequest ]###
73 \certificationRequestInfo\
74 |###[ PKCS10_CertificationRequestInfo ]###
75 | version = 0x0 <ASN1_INTEGER[0]
76 | \subject \
77 | |###[ X509_RDN ]###
78 | | \rdn \
79 | | |###[ X509_AttributeTypeAndValue ]###
80 | | | type = <ASN1_OID['organizationName']>
81 | | | value = <ASN1_UTF8_STRING[b'TestOrg']>
82 [...]
83
84Get its public key and verify its signature::
85
86 >>> csr.pubkey
87 <scapy.layers.tls.cert.PubKeyRSA at 0x7f3481149310>
88 >>> csr.verifySelf()
89 True
90
91No need for obnoxious openssl tweaking anymore. :)
92"""
93
94import base64
95import enum
96import os
97import time
98import warnings
99
100from scapy.config import conf, crypto_validator
101from scapy.compat import Self
102from scapy.error import warning
103from scapy.utils import binrepr
104from scapy.asn1.asn1 import (
105 ASN1_BIT_STRING,
106 ASN1_NULL,
107 ASN1_OID,
108 ASN1_STRING,
109)
110from scapy.asn1.mib import hash_by_oid
111from scapy.packet import Packet
112from scapy.layers.x509 import (
113 CMS_CertificateChoices,
114 CMS_ContentInfo,
115 CMS_EncapsulatedContentInfo,
116 CMS_IssuerAndSerialNumber,
117 CMS_RevocationInfoChoice,
118 CMS_SignedAttrsForSignature,
119 CMS_SignedData,
120 CMS_SignerInfo,
121 CMS_SubjectKeyIdentifier,
122 ECDSAPrivateKey_OpenSSL,
123 ECDSAPrivateKey,
124 ECDSAPublicKey,
125 EdDSAPrivateKey,
126 EdDSAPublicKey,
127 PKCS10_CertificationRequest,
128 RSAPrivateKey_OpenSSL,
129 RSAPrivateKey,
130 RSAPublicKey,
131 X509_AlgorithmIdentifier,
132 X509_Attribute,
133 X509_AttributeValue,
134 X509_Cert,
135 X509_CRL,
136 X509_SubjectPublicKeyInfo,
137)
138from scapy.layers.tls.crypto.pkcs1 import (
139 _DecryptAndSignRSA,
140 _EncryptAndVerifyRSA,
141 _get_hash,
142 pkcs_os2ip,
143)
144from scapy.compat import bytes_encode
145
146# Typing imports
147from typing import (
148 List,
149 Optional,
150 Union,
151)
152
153if conf.crypto_valid:
154 from cryptography.exceptions import InvalidSignature
155 from cryptography.hazmat.backends import default_backend
156 from cryptography.hazmat.primitives import hashes
157 from cryptography.hazmat.primitives import serialization
158 from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519
159
160 # cryptography raised the minimum RSA key length to 1024 in 43.0+
161 # https://github.com/pyca/cryptography/pull/10278
162 # but we need still 512 for EXPORT40 ciphers (yes EXPORT is terrible)
163 # https://datatracker.ietf.org/doc/html/rfc2246#autoid-66
164 # The following detects the change and hacks around it using the backend
165
166 try:
167 rsa.generate_private_key(public_exponent=65537, key_size=512)
168 _RSA_512_SUPPORTED = True
169 except ValueError:
170 # cryptography > 43.0
171 _RSA_512_SUPPORTED = False
172 from cryptography.hazmat.primitives.asymmetric.rsa import rust_openssl
173
174
175# Maximum allowed size in bytes for a certificate file, to avoid
176# loading huge file when importing a cert
177_MAX_KEY_SIZE = 50 * 1024
178_MAX_CERT_SIZE = 50 * 1024
179_MAX_CRL_SIZE = 10 * 1024 * 1024 # some are that big
180_MAX_CSR_SIZE = 50 * 1024
181
182
183#####################################################################
184# Some helpers
185#####################################################################
186
187
188@conf.commands.register
189def der2pem(der_string, obj="UNKNOWN"):
190 """Convert DER octet string to PEM format (with optional header)"""
191 # Encode a byte string in PEM format. Header advertises <obj> type.
192 pem_string = "-----BEGIN %s-----\n" % obj
193 base64_string = base64.b64encode(der_string).decode()
194 chunks = [base64_string[i : i + 64] for i in range(0, len(base64_string), 64)]
195 pem_string += "\n".join(chunks)
196 pem_string += "\n-----END %s-----\n" % obj
197 return pem_string
198
199
200@conf.commands.register
201def pem2der(pem_string):
202 """Convert PEM string to DER format"""
203 # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'.
204 pem_string = pem_string.replace(b"\r", b"")
205 first_idx = pem_string.find(b"-----\n") + 6
206 if pem_string.find(b"-----BEGIN", first_idx) != -1:
207 raise Exception("pem2der() expects only one PEM-encoded object")
208 last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----"))
209 base64_string = pem_string[first_idx:last_idx]
210 base64_string.replace(b"\n", b"")
211 der_string = base64.b64decode(base64_string)
212 return der_string
213
214
215def split_pem(s):
216 """
217 Split PEM objects. Useful to process concatenated certificates.
218 """
219 pem_strings = []
220 while s != b"":
221 start_idx = s.find(b"-----BEGIN")
222 if start_idx == -1:
223 break
224 end_idx = s.find(b"-----END")
225 if end_idx == -1:
226 raise Exception("Invalid PEM object (missing END tag)")
227 end_idx = s.find(b"\n", end_idx) + 1
228 if end_idx == 0:
229 # There is no final \n
230 end_idx = len(s)
231 pem_strings.append(s[start_idx:end_idx])
232 s = s[end_idx:]
233 return pem_strings
234
235
236class _PKIObj(object):
237 def __init__(self, frmt, der):
238 self.frmt = frmt
239 self._der = der
240
241
242class _PKIObjMaker(type):
243 def __call__(cls, obj_path, obj_max_size, pem_marker=None):
244 # This enables transparent DER and PEM-encoded data imports.
245 # Note that when importing a PEM file with multiple objects (like ECDSA
246 # private keys output by openssl), it will concatenate every object in
247 # order to create a 'der' attribute. When converting a 'multi' DER file
248 # into a PEM file, though, the PEM attribute will not be valid,
249 # because we do not try to identify the class of each object.
250 error_msg = "Unable to import data"
251
252 if obj_path is None:
253 raise Exception(error_msg)
254 obj_path = bytes_encode(obj_path)
255
256 if (b"\x00" not in obj_path) and os.path.isfile(obj_path):
257 _size = os.path.getsize(obj_path)
258 if _size > obj_max_size:
259 raise Exception(error_msg)
260 try:
261 with open(obj_path, "rb") as f:
262 _raw = f.read()
263 except Exception:
264 raise Exception(error_msg)
265 else:
266 _raw = obj_path
267
268 try:
269 if b"-----BEGIN" in _raw:
270 frmt = "PEM"
271 pem = _raw
272 der_list = split_pem(pem)
273 der = b"".join(map(pem2der, der_list))
274 else:
275 frmt = "DER"
276 der = _raw
277 except Exception:
278 raise Exception(error_msg)
279
280 p = _PKIObj(frmt, der)
281 return p
282
283
284#####################################################################
285# PKI objects wrappers
286#####################################################################
287
288###############
289# Public Keys #
290###############
291
292
293class _PubKeyFactory(_PKIObjMaker):
294 """
295 Metaclass for PubKey creation.
296 It casts the appropriate class on the fly, then fills in
297 the appropriate attributes with import_from_asn1pkt() submethod.
298 """
299
300 def __call__(cls, key_path=None, cryptography_obj=None):
301 # This allows to import cryptography objects directly
302 if cryptography_obj is not None:
303 obj = type.__call__(cls)
304 obj.__class__ = cls
305 obj.frmt = "original"
306 obj.marker = "PUBLIC KEY"
307 obj.pubkey = cryptography_obj
308 return obj
309
310 if key_path is None:
311 obj = type.__call__(cls)
312 if cls is PubKey:
313 cls = PubKeyRSA
314 obj.__class__ = cls
315 obj.frmt = "original"
316 obj.fill_and_store()
317 return obj
318
319 # This deals with the rare RSA 'kx export' call.
320 if isinstance(key_path, tuple):
321 obj = type.__call__(cls)
322 obj.__class__ = PubKeyRSA
323 obj.frmt = "tuple"
324 obj.import_from_tuple(key_path)
325 return obj
326
327 # Now for the usual calls, key_path may be the path to either:
328 # _an X509_SubjectPublicKeyInfo, as processed by openssl;
329 # _an RSAPublicKey;
330 # _an ECDSAPublicKey;
331 # _an EdDSAPublicKey.
332 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
333 try:
334 spki = X509_SubjectPublicKeyInfo(obj._der)
335 pubkey = spki.subjectPublicKey
336 if isinstance(pubkey, RSAPublicKey):
337 obj.__class__ = PubKeyRSA
338 obj.import_from_asn1pkt(pubkey)
339 elif isinstance(pubkey, ECDSAPublicKey):
340 obj.__class__ = PubKeyECDSA
341 obj.import_from_der(obj._der)
342 elif isinstance(pubkey, EdDSAPublicKey):
343 obj.__class__ = PubKeyEdDSA
344 obj.import_from_der(obj._der)
345 else:
346 raise
347 obj.marker = "PUBLIC KEY"
348 except Exception:
349 try:
350 pubkey = RSAPublicKey(obj._der)
351 obj.__class__ = PubKeyRSA
352 obj.import_from_asn1pkt(pubkey)
353 obj.marker = "RSA PUBLIC KEY"
354 except Exception:
355 # We cannot import an ECDSA public key without curve knowledge
356 if conf.debug_dissector:
357 raise
358 raise Exception("Unable to import public key")
359 return obj
360
361
362class PubKey(metaclass=_PubKeyFactory):
363 """
364 Parent class for PubKeyRSA, PubKeyECDSA and PubKeyEdDSA.
365 Provides common verifyCert() and export() methods.
366 """
367
368 def verifyCert(self, cert):
369 """Verifies either a Cert or an X509_Cert."""
370 h = _get_cert_sig_hashname(cert)
371 tbsCert = cert.tbsCertificate
372 sigVal = bytes(cert.signatureValue)
373 return self.verify(bytes(tbsCert), sigVal, h=h, t="pkcs")
374
375 def verifyCsr(self, csr):
376 """Verifies a CSR."""
377 h = _get_csr_sig_hashname(csr)
378 certReqInfo = csr.certReq.certificationRequestInfo
379 sigVal = bytes(csr.certReq.signature)
380 return self.verify(bytes(certReqInfo), sigVal, h=h, t="pkcs")
381
382 @property
383 def pem(self):
384 return der2pem(self.der, self.marker)
385
386 @property
387 def der(self):
388 return self.pubkey.public_bytes(
389 encoding=serialization.Encoding.DER,
390 format=serialization.PublicFormat.SubjectPublicKeyInfo,
391 )
392
393 def public_numbers(self, *args, **kwargs):
394 return self.pubkey.public_numbers(*args, **kwargs)
395
396 @property
397 def key_size(self):
398 return self.pubkey.key_size
399
400 def export(self, filename, fmt=None):
401 """
402 Export public key in 'fmt' format (DER or PEM) to file 'filename'
403 """
404 if fmt is None:
405 if filename.endswith(".pem"):
406 fmt = "PEM"
407 else:
408 fmt = "DER"
409 with open(filename, "wb") as f:
410 if fmt == "DER":
411 return f.write(self.der)
412 elif fmt == "PEM":
413 return f.write(self.pem.encode())
414
415 @crypto_validator
416 def verify(self, msg, sig, h="sha256", **kwargs):
417 """
418 Verify signed data.
419 """
420 raise NotImplementedError
421
422
423class PubKeyRSA(PubKey, _EncryptAndVerifyRSA):
424 """
425 Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py
426 Use the 'key' attribute to access original object.
427 """
428
429 @crypto_validator
430 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None):
431 pubExp = pubExp or 65537
432 if not modulus:
433 real_modulusLen = modulusLen or 2048
434 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED:
435 # cryptography > 43.0 compatibility
436 private_key = rust_openssl.rsa.generate_private_key(
437 public_exponent=pubExp,
438 key_size=real_modulusLen,
439 )
440 else:
441 private_key = rsa.generate_private_key(
442 public_exponent=pubExp,
443 key_size=real_modulusLen,
444 backend=default_backend(),
445 )
446 self.pubkey = private_key.public_key()
447 else:
448 real_modulusLen = len(binrepr(modulus))
449 if modulusLen and real_modulusLen != modulusLen:
450 warning("modulus and modulusLen do not match!")
451 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
452 self.pubkey = pubNum.public_key(default_backend())
453
454 self.marker = "PUBLIC KEY"
455
456 # Lines below are only useful for the legacy part of pkcs1.py
457 pubNum = self.pubkey.public_numbers()
458 self._modulusLen = real_modulusLen
459 self._modulus = pubNum.n
460 self._pubExp = pubNum.e
461
462 @crypto_validator
463 def import_from_tuple(self, tup):
464 # this is rarely used
465 e, m, mLen = tup
466 if isinstance(m, bytes):
467 m = pkcs_os2ip(m)
468 if isinstance(e, bytes):
469 e = pkcs_os2ip(e)
470 self.fill_and_store(modulus=m, pubExp=e)
471
472 def import_from_asn1pkt(self, pubkey):
473 modulus = pubkey.modulus.val
474 pubExp = pubkey.publicExponent.val
475 self.fill_and_store(modulus=modulus, pubExp=pubExp)
476
477 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
478 # no ECDSA encryption support, hence no ECDSA specific keywords here
479 return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L)
480
481 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
482 return _EncryptAndVerifyRSA.verify(self, msg, sig, t=t, h=h, mgf=mgf, L=L)
483
484
485class PubKeyECDSA(PubKey):
486 """
487 Wrapper for ECDSA keys based on the cryptography library.
488 Use the 'key' attribute to access original object.
489 """
490
491 @crypto_validator
492 def fill_and_store(self, curve=None):
493 curve = curve or ec.SECP256R1
494 private_key = ec.generate_private_key(curve(), default_backend())
495 self.pubkey = private_key.public_key()
496
497 @crypto_validator
498 def import_from_der(self, pubkey):
499 # No lib support for explicit curves nor compressed points.
500 self.pubkey = serialization.load_der_public_key(
501 pubkey,
502 backend=default_backend(),
503 )
504
505 def encrypt(self, msg, h="sha256", **kwargs):
506 raise Exception("No ECDSA encryption support")
507
508 @crypto_validator
509 def verify(self, msg, sig, h="sha256", **kwargs):
510 # 'sig' should be a DER-encoded signature, as per RFC 3279
511 try:
512 self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h)))
513 return True
514 except InvalidSignature:
515 return False
516
517
518class PubKeyEdDSA(PubKey):
519 """
520 Wrapper for EdDSA keys based on the cryptography library.
521 Use the 'key' attribute to access original object.
522 """
523
524 @crypto_validator
525 def fill_and_store(self, curve=None):
526 curve = curve or x25519.X25519PrivateKey
527 private_key = curve.generate()
528 self.pubkey = private_key.public_key()
529
530 @crypto_validator
531 def import_from_der(self, pubkey):
532 self.pubkey = serialization.load_der_public_key(
533 pubkey,
534 backend=default_backend(),
535 )
536
537 def encrypt(self, msg, **kwargs):
538 raise Exception("No EdDSA encryption support")
539
540 @crypto_validator
541 def verify(self, msg, sig, **kwargs):
542 # 'sig' should be a DER-encoded signature, as per RFC 3279
543 try:
544 self.pubkey.verify(sig, msg)
545 return True
546 except InvalidSignature:
547 return False
548
549
550################
551# Private Keys #
552################
553
554
555class _PrivKeyFactory(_PKIObjMaker):
556 """
557 Metaclass for PrivKey creation.
558 It casts the appropriate class on the fly, then fills in
559 the appropriate attributes with import_from_asn1pkt() submethod.
560 """
561
562 def __call__(cls, key_path=None, cryptography_obj=None):
563 """
564 key_path may be the path to either:
565 _an RSAPrivateKey_OpenSSL (as generated by openssl);
566 _an ECDSAPrivateKey_OpenSSL (as generated by openssl);
567 _an RSAPrivateKey;
568 _an ECDSAPrivateKey.
569 """
570 if key_path is None:
571 obj = type.__call__(cls)
572 if cls is PrivKey:
573 cls = PrivKeyECDSA
574 obj.__class__ = cls
575 obj.frmt = "original"
576 obj.fill_and_store()
577 return obj
578
579 # This allows to import cryptography objects directly
580 if cryptography_obj is not None:
581 # We (stupidly) need to go through the whole import process because RSA
582 # does more than just importing the cryptography objects...
583 obj = _PKIObj(
584 "DER",
585 cryptography_obj.private_bytes(
586 encoding=serialization.Encoding.DER,
587 format=serialization.PrivateFormat.PKCS8,
588 encryption_algorithm=serialization.NoEncryption(),
589 ),
590 )
591 else:
592 # Load from file
593 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
594
595 try:
596 privkey = RSAPrivateKey_OpenSSL(obj._der)
597 privkey = privkey.privateKey
598 obj.__class__ = PrivKeyRSA
599 obj.marker = "PRIVATE KEY"
600 except Exception:
601 try:
602 privkey = ECDSAPrivateKey_OpenSSL(obj._der)
603 privkey = privkey.privateKey
604 obj.__class__ = PrivKeyECDSA
605 obj.marker = "EC PRIVATE KEY"
606 except Exception:
607 try:
608 privkey = RSAPrivateKey(obj._der)
609 obj.__class__ = PrivKeyRSA
610 obj.marker = "RSA PRIVATE KEY"
611 except Exception:
612 try:
613 privkey = ECDSAPrivateKey(obj._der)
614 obj.__class__ = PrivKeyECDSA
615 obj.marker = "EC PRIVATE KEY"
616 except Exception:
617 try:
618 privkey = EdDSAPrivateKey(obj._der)
619 obj.__class__ = PrivKeyEdDSA
620 obj.marker = "PRIVATE KEY"
621 except Exception:
622 raise Exception("Unable to import private key")
623 try:
624 obj.import_from_asn1pkt(privkey)
625 except ImportError:
626 pass
627 return obj
628
629
630class _Raw_ASN1_BIT_STRING(ASN1_BIT_STRING):
631 """A ASN1_BIT_STRING that ignores BER encoding"""
632
633 def __bytes__(self):
634 return self.val_readable
635
636 __str__ = __bytes__
637
638
639class PrivKey(metaclass=_PrivKeyFactory):
640 """
641 Parent class for PrivKeyRSA, PrivKeyECDSA and PrivKeyEdDSA.
642 Provides common signTBSCert(), resignCert(), verifyCert()
643 and export() methods.
644 """
645
646 def signTBSCert(self, tbsCert, h="sha256"):
647 """
648 Note that this will always copy the signature field from the
649 tbsCertificate into the signatureAlgorithm field of the result,
650 regardless of the coherence between its contents (which might
651 indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2).
652
653 There is a small inheritance trick for the computation of sigVal
654 below: in order to use a sign() method which would apply
655 to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the
656 subclasses accept any argument, be it from the RSA or ECDSA world,
657 and then they keep the ones they're interested in.
658 Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign().
659 """
660 sigAlg = tbsCert.signature
661 h = h or hash_by_oid[sigAlg.algorithm.val]
662 sigVal = self.sign(bytes(tbsCert), h=h, t="pkcs")
663 c = X509_Cert()
664 c.tbsCertificate = tbsCert
665 c.signatureAlgorithm = sigAlg
666 c.signatureValue = _Raw_ASN1_BIT_STRING(sigVal, readable=True)
667 return c
668
669 def resignCert(self, cert):
670 """Rewrite the signature of either a Cert or an X509_Cert."""
671 return self.signTBSCert(cert.tbsCertificate, h=None)
672
673 def verifyCert(self, cert):
674 """Verifies either a Cert or an X509_Cert."""
675 return self.pubkey.verifyCert(cert)
676
677 def verifyCsr(self, cert):
678 """Verifies either a CSR."""
679 return self.pubkey.verifyCsr(cert)
680
681 @property
682 def pem(self):
683 return der2pem(self.der, self.marker)
684
685 @property
686 def der(self):
687 return self.key.private_bytes(
688 encoding=serialization.Encoding.DER,
689 format=serialization.PrivateFormat.PKCS8,
690 encryption_algorithm=serialization.NoEncryption(),
691 )
692
693 def export(self, filename, fmt=None):
694 """
695 Export private key in 'fmt' format (DER or PEM) to file 'filename'
696 """
697 if fmt is None:
698 if filename.endswith(".pem"):
699 fmt = "PEM"
700 else:
701 fmt = "DER"
702 with open(filename, "wb") as f:
703 if fmt == "DER":
704 return f.write(self.der)
705 elif fmt == "PEM":
706 return f.write(self.pem.encode())
707
708 @crypto_validator
709 def sign(self, data, h="sha256", **kwargs):
710 """
711 Sign data.
712 """
713 raise NotImplementedError
714
715 @crypto_validator
716 def verify(self, msg, sig, h="sha256", **kwargs):
717 """
718 Verify signed data.
719 """
720 raise NotImplementedError
721
722
723class PrivKeyRSA(PrivKey, _DecryptAndSignRSA):
724 """
725 Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py
726 Use the 'key' attribute to access original object.
727 """
728
729 @crypto_validator
730 def fill_and_store(
731 self,
732 modulus=None,
733 modulusLen=None,
734 pubExp=None,
735 prime1=None,
736 prime2=None,
737 coefficient=None,
738 exponent1=None,
739 exponent2=None,
740 privExp=None,
741 ):
742 pubExp = pubExp or 65537
743 if None in [
744 modulus,
745 prime1,
746 prime2,
747 coefficient,
748 privExp,
749 exponent1,
750 exponent2,
751 ]:
752 # note that the library requires every parameter
753 # in order to call RSAPrivateNumbers(...)
754 # if one of these is missing, we generate a whole new key
755 real_modulusLen = modulusLen or 2048
756 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED:
757 # cryptography > 43.0 compatibility
758 self.key = rust_openssl.rsa.generate_private_key(
759 public_exponent=pubExp,
760 key_size=real_modulusLen,
761 )
762 else:
763 self.key = rsa.generate_private_key(
764 public_exponent=pubExp,
765 key_size=real_modulusLen,
766 backend=default_backend(),
767 )
768 pubkey = self.key.public_key()
769 else:
770 real_modulusLen = len(binrepr(modulus))
771 if modulusLen and real_modulusLen != modulusLen:
772 warning("modulus and modulusLen do not match!")
773 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
774 privNum = rsa.RSAPrivateNumbers(
775 p=prime1,
776 q=prime2,
777 dmp1=exponent1,
778 dmq1=exponent2,
779 iqmp=coefficient,
780 d=privExp,
781 public_numbers=pubNum,
782 )
783 self.key = privNum.private_key(default_backend())
784 pubkey = self.key.public_key()
785
786 self.marker = "PRIVATE KEY"
787
788 # Lines below are only useful for the legacy part of pkcs1.py
789 pubNum = pubkey.public_numbers()
790 self._modulusLen = real_modulusLen
791 self._modulus = pubNum.n
792 self._pubExp = pubNum.e
793
794 self.pubkey = PubKeyRSA((pubNum.e, pubNum.n, real_modulusLen))
795
796 def import_from_asn1pkt(self, privkey):
797 modulus = privkey.modulus.val
798 pubExp = privkey.publicExponent.val
799 privExp = privkey.privateExponent.val
800 prime1 = privkey.prime1.val
801 prime2 = privkey.prime2.val
802 exponent1 = privkey.exponent1.val
803 exponent2 = privkey.exponent2.val
804 coefficient = privkey.coefficient.val
805 self.fill_and_store(
806 modulus=modulus,
807 pubExp=pubExp,
808 privExp=privExp,
809 prime1=prime1,
810 prime2=prime2,
811 exponent1=exponent1,
812 exponent2=exponent2,
813 coefficient=coefficient,
814 )
815
816 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
817 return self.pubkey.verify(
818 msg=msg,
819 sig=sig,
820 t=t,
821 h=h,
822 mgf=mgf,
823 L=L,
824 )
825
826 def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None):
827 return _DecryptAndSignRSA.sign(self, data, t=t, h=h, mgf=mgf, L=L)
828
829
830class PrivKeyECDSA(PrivKey):
831 """
832 Wrapper for ECDSA keys based on SigningKey from ecdsa library.
833 Use the 'key' attribute to access original object.
834 """
835
836 @crypto_validator
837 def fill_and_store(self, curve=None):
838 curve = curve or ec.SECP256R1
839 self.key = ec.generate_private_key(curve(), default_backend())
840 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
841 self.marker = "EC PRIVATE KEY"
842
843 @crypto_validator
844 def import_from_asn1pkt(self, privkey):
845 self.key = serialization.load_der_private_key(
846 bytes(privkey), None, backend=default_backend()
847 )
848 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
849 self.marker = "EC PRIVATE KEY"
850
851 @crypto_validator
852 def verify(self, msg, sig, h="sha256", **kwargs):
853 return self.pubkey.verify(msg=msg, sig=sig, h=h, **kwargs)
854
855 @crypto_validator
856 def sign(self, data, h="sha256", **kwargs):
857 return self.key.sign(data, ec.ECDSA(_get_hash(h)))
858
859
860class PrivKeyEdDSA(PrivKey):
861 """
862 Wrapper for EdDSA keys
863 Use the 'key' attribute to access original object.
864 """
865
866 @crypto_validator
867 def fill_and_store(self, curve=None):
868 curve = curve or x25519.X25519PrivateKey
869 self.key = curve.generate()
870 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
871 self.marker = "PRIVATE KEY"
872
873 @crypto_validator
874 def import_from_asn1pkt(self, privkey):
875 self.key = serialization.load_der_private_key(
876 bytes(privkey), None, backend=default_backend()
877 )
878 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
879 self.marker = "PRIVATE KEY"
880
881 @crypto_validator
882 def verify(self, msg, sig, **kwargs):
883 return self.pubkey.verify(msg=msg, sig=sig, **kwargs)
884
885 @crypto_validator
886 def sign(self, data, **kwargs):
887 return self.key.sign(data)
888
889
890################
891# Certificates #
892################
893
894
895class _CertMaker(_PKIObjMaker):
896 """
897 Metaclass for Cert creation. It is not necessary as it was for the keys,
898 but we reuse the model instead of creating redundant constructors.
899 """
900
901 def __call__(cls, cert_path=None, cryptography_obj=None):
902 # This allows to import cryptography objects directly
903 if cryptography_obj is not None:
904 obj = _PKIObj(
905 "DER",
906 cryptography_obj.public_bytes(
907 encoding=serialization.Encoding.DER,
908 ),
909 )
910 else:
911 # Load from file
912 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CERT_SIZE, "CERTIFICATE")
913 obj.__class__ = Cert
914 obj.marker = "CERTIFICATE"
915 try:
916 cert = X509_Cert(obj._der)
917 except Exception:
918 if conf.debug_dissector:
919 raise
920 raise Exception("Unable to import certificate")
921 obj.import_from_asn1pkt(cert)
922 return obj
923
924
925def _get_cert_sig_hashname(cert):
926 """
927 Return the hash associated with the signature algorithm of a certificate.
928 """
929 tbsCert = cert.tbsCertificate
930 sigAlg = tbsCert.signature
931 return hash_by_oid[sigAlg.algorithm.val]
932
933
934def _get_csr_sig_hashname(csr):
935 """
936 Return the hash associated with the signature algorithm of a CSR.
937 """
938 certReq = csr.certReq
939 sigAlg = certReq.signatureAlgorithm
940 return hash_by_oid[sigAlg.algorithm.val]
941
942
943class Cert(metaclass=_CertMaker):
944 """
945 Wrapper for the X509_Cert from layers/x509.py.
946 Use the 'x509Cert' attribute to access original object.
947 """
948
949 def import_from_asn1pkt(self, cert):
950 error_msg = "Unable to import certificate"
951
952 self.x509Cert = cert
953
954 tbsCert = cert.tbsCertificate
955
956 if tbsCert.version:
957 self.version = tbsCert.version.val + 1
958 else:
959 self.version = 1
960 self.serial = tbsCert.serialNumber.val
961 self.sigAlg = tbsCert.signature.algorithm.oidname
962 self.issuer = tbsCert.get_issuer()
963 self.issuer_str = tbsCert.get_issuer_str()
964 self.issuer_hash = hash(self.issuer_str)
965 self.subject = tbsCert.get_subject()
966 self.subject_str = tbsCert.get_subject_str()
967 self.subject_hash = hash(self.subject_str)
968 self.authorityKeyID = None
969
970 self.notBefore_str = tbsCert.validity.not_before.pretty_time
971 try:
972 self.notBefore = tbsCert.validity.not_before.datetime.timetuple()
973 except ValueError:
974 raise Exception(error_msg)
975 self.notBefore_str_simple = time.strftime("%x", self.notBefore)
976
977 self.notAfter_str = tbsCert.validity.not_after.pretty_time
978 try:
979 self.notAfter = tbsCert.validity.not_after.datetime.timetuple()
980 except ValueError:
981 raise Exception(error_msg)
982 self.notAfter_str_simple = time.strftime("%x", self.notAfter)
983
984 self.pubkey = PubKey(bytes(tbsCert.subjectPublicKeyInfo))
985
986 if tbsCert.extensions:
987 for extn in tbsCert.extensions:
988 if extn.extnID.oidname == "basicConstraints":
989 self.cA = False
990 if extn.extnValue.cA:
991 self.cA = not (extn.extnValue.cA.val == 0)
992 elif extn.extnID.oidname == "keyUsage":
993 self.keyUsage = extn.extnValue.get_keyUsage()
994 elif extn.extnID.oidname == "extKeyUsage":
995 self.extKeyUsage = extn.extnValue.get_extendedKeyUsage()
996 elif extn.extnID.oidname == "authorityKeyIdentifier":
997 self.authorityKeyID = extn.extnValue.keyIdentifier.val
998
999 self.signatureValue = bytes(cert.signatureValue)
1000 self.signatureLen = len(self.signatureValue)
1001
1002 def isIssuer(self, other):
1003 """
1004 True if 'other' issued 'self', i.e.:
1005 - self.issuer == other.subject
1006 - self is signed by other
1007 """
1008 if self.issuer_hash != other.subject_hash:
1009 return False
1010 return other.pubkey.verifyCert(self)
1011
1012 def isIssuerCert(self, other):
1013 return self.isIssuer(other)
1014
1015 def isSelfSigned(self):
1016 """
1017 Return True if the certificate is self-signed:
1018 - issuer and subject are the same
1019 - the signature of the certificate is valid.
1020 """
1021 if self.issuer_hash == self.subject_hash:
1022 return self.isIssuer(self)
1023 return False
1024
1025 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
1026 # no ECDSA *encryption* support, hence only RSA specific keywords here
1027 return self.pubkey.encrypt(msg, t=t, h=h, mgf=mgf, L=L)
1028
1029 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
1030 return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L)
1031
1032 def getSignatureHash(self):
1033 """
1034 Return the hash cryptography object used by the 'signatureAlgorithm'
1035 """
1036 return _get_hash(_get_cert_sig_hashname(self))
1037
1038 def setSubjectPublicKeyFromPrivateKey(self, key):
1039 """
1040 Replace the subjectPublicKeyInfo of this certificate with the one from
1041 the provided key.
1042 """
1043 if isinstance(key, (PubKey, PrivKey)):
1044 if isinstance(key, PrivKey):
1045 pubkey = key.pubkey
1046 else:
1047 pubkey = key
1048 self.tbsCertificate.subjectPublicKeyInfo = X509_SubjectPublicKeyInfo(
1049 pubkey.der
1050 )
1051 else:
1052 raise ValueError("Unknown type 'key', should be PubKey or PrivKey")
1053
1054 def resignWith(self, key):
1055 """
1056 Resign a certificate with a specific key
1057 """
1058 self.import_from_asn1pkt(key.resignCert(self))
1059
1060 def remainingDays(self, now=None):
1061 """
1062 Based on the value of notAfter field, returns the number of
1063 days the certificate will still be valid. The date used for the
1064 comparison is the current and local date, as returned by
1065 time.localtime(), except if 'now' argument is provided another
1066 one. 'now' argument can be given as either a time tuple or a string
1067 representing the date. Accepted format for the string version
1068 are:
1069
1070 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT'
1071 - '%m/%d/%y' e.g. '01/30/08' (less precise)
1072
1073 If the certificate is no more valid at the date considered, then
1074 a negative value is returned representing the number of days
1075 since it has expired.
1076
1077 The number of days is returned as a float to deal with the unlikely
1078 case of certificates that are still just valid.
1079 """
1080 if now is None:
1081 now = time.localtime()
1082 elif isinstance(now, str):
1083 try:
1084 if "/" in now:
1085 now = time.strptime(now, "%m/%d/%y")
1086 else:
1087 now = time.strptime(now, "%b %d %H:%M:%S %Y %Z")
1088 except Exception:
1089 warning("Bad time string provided, will use localtime() instead.")
1090 now = time.localtime()
1091
1092 now = time.mktime(now)
1093 nft = time.mktime(self.notAfter)
1094 diff = (nft - now) / (24.0 * 3600)
1095 return diff
1096
1097 def isRevoked(self, crl_list):
1098 """
1099 Given a list of trusted CRL (their signature has already been
1100 verified with trusted anchors), this function returns True if
1101 the certificate is marked as revoked by one of those CRL.
1102
1103 Note that if the Certificate was on hold in a previous CRL and
1104 is now valid again in a new CRL and bot are in the list, it
1105 will be considered revoked: this is because _all_ CRLs are
1106 checked (not only the freshest) and revocation status is not
1107 handled.
1108
1109 Also note that the check on the issuer is performed on the
1110 Authority Key Identifier if available in _both_ the CRL and the
1111 Cert. Otherwise, the issuers are simply compared.
1112 """
1113 for c in crl_list:
1114 if (
1115 self.authorityKeyID is not None
1116 and c.authorityKeyID is not None
1117 and self.authorityKeyID == c.authorityKeyID
1118 ):
1119 return self.serial in (x[0] for x in c.revoked_cert_serials)
1120 elif self.issuer == c.issuer:
1121 return self.serial in (x[0] for x in c.revoked_cert_serials)
1122 return False
1123
1124 @property
1125 def tbsCertificate(self):
1126 return self.x509Cert.tbsCertificate
1127
1128 @property
1129 def pem(self):
1130 return der2pem(self.der, self.marker)
1131
1132 @property
1133 def der(self):
1134 return bytes(self.x509Cert)
1135
1136 @property
1137 def pubKey(self):
1138 warnings.warn(
1139 "Cert.pubKey is deprecated and will be removed in a future version. "
1140 "Use Cert.pubkey",
1141 DeprecationWarning,
1142 )
1143 return self.pubkey
1144
1145 def __eq__(self, other):
1146 return self.der == other.der
1147
1148 def __hash__(self):
1149 return hash(self.der)
1150
1151 def export(self, filename, fmt=None):
1152 """
1153 Export certificate in 'fmt' format (DER or PEM) to file 'filename'
1154 """
1155 if fmt is None:
1156 if filename.endswith(".pem"):
1157 fmt = "PEM"
1158 else:
1159 fmt = "DER"
1160 with open(filename, "wb") as f:
1161 if fmt == "DER":
1162 return f.write(self.der)
1163 elif fmt == "PEM":
1164 return f.write(self.pem.encode())
1165
1166 def show(self):
1167 print("Serial: %s" % self.serial)
1168 print("Issuer: " + self.issuer_str)
1169 print("Subject: " + self.subject_str)
1170 print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str))
1171
1172 def __repr__(self):
1173 return "[X.509 Cert. Subject:%s, Issuer:%s]" % (
1174 self.subject_str,
1175 self.issuer_str,
1176 )
1177
1178
1179################################
1180# Certificate Revocation Lists #
1181################################
1182
1183
1184class _CRLMaker(_PKIObjMaker):
1185 """
1186 Metaclass for CRL creation. It is not necessary as it was for the keys,
1187 but we reuse the model instead of creating redundant constructors.
1188 """
1189
1190 def __call__(cls, cert_path):
1191 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL")
1192 obj.__class__ = CRL
1193 try:
1194 crl = X509_CRL(obj._der)
1195 except Exception:
1196 raise Exception("Unable to import CRL")
1197 obj.import_from_asn1pkt(crl)
1198 return obj
1199
1200
1201class CRL(metaclass=_CRLMaker):
1202 """
1203 Wrapper for the X509_CRL from layers/x509.py.
1204 Use the 'x509CRL' attribute to access original object.
1205 """
1206
1207 def import_from_asn1pkt(self, crl):
1208 error_msg = "Unable to import CRL"
1209
1210 self.x509CRL = crl
1211
1212 tbsCertList = crl.tbsCertList
1213 self.tbsCertList = bytes(tbsCertList)
1214
1215 if tbsCertList.version:
1216 self.version = tbsCertList.version.val + 1
1217 else:
1218 self.version = 1
1219 self.sigAlg = tbsCertList.signature.algorithm.oidname
1220 self.issuer = tbsCertList.get_issuer()
1221 self.issuer_str = tbsCertList.get_issuer_str()
1222 self.issuer_hash = hash(self.issuer_str)
1223
1224 self.lastUpdate_str = tbsCertList.this_update.pretty_time
1225 lastUpdate = tbsCertList.this_update.val
1226 if lastUpdate[-1] == "Z":
1227 lastUpdate = lastUpdate[:-1]
1228 try:
1229 self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S")
1230 except Exception:
1231 raise Exception(error_msg)
1232 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate)
1233
1234 self.nextUpdate = None
1235 self.nextUpdate_str_simple = None
1236 if tbsCertList.next_update:
1237 self.nextUpdate_str = tbsCertList.next_update.pretty_time
1238 nextUpdate = tbsCertList.next_update.val
1239 if nextUpdate[-1] == "Z":
1240 nextUpdate = nextUpdate[:-1]
1241 try:
1242 self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S")
1243 except Exception:
1244 raise Exception(error_msg)
1245 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate)
1246
1247 if tbsCertList.crlExtensions:
1248 for extension in tbsCertList.crlExtensions:
1249 if extension.extnID.oidname == "cRLNumber":
1250 self.number = extension.extnValue.cRLNumber.val
1251
1252 revoked = []
1253 if tbsCertList.revokedCertificates:
1254 for cert in tbsCertList.revokedCertificates:
1255 serial = cert.serialNumber.val
1256 date = cert.revocationDate.val
1257 if date[-1] == "Z":
1258 date = date[:-1]
1259 try:
1260 time.strptime(date, "%y%m%d%H%M%S")
1261 except Exception:
1262 raise Exception(error_msg)
1263 revoked.append((serial, date))
1264 self.revoked_cert_serials = revoked
1265
1266 self.signatureValue = bytes(crl.signatureValue)
1267 self.signatureLen = len(self.signatureValue)
1268
1269 def isIssuer(self, other):
1270 # This is exactly the same thing as in Cert method.
1271 if self.issuer_hash != other.subject_hash:
1272 return False
1273 return other.pubkey.verifyCert(self)
1274
1275 def verify(self, anchors):
1276 # Return True iff the CRL is signed by one of the provided anchors.
1277 return any(self.isIssuer(a) for a in anchors)
1278
1279 def show(self):
1280 print("Version: %d" % self.version)
1281 print("sigAlg: " + self.sigAlg)
1282 print("Issuer: " + self.issuer_str)
1283 print("lastUpdate: %s" % self.lastUpdate_str)
1284 print("nextUpdate: %s" % self.nextUpdate_str)
1285
1286
1287###############################
1288# Certificate Signing Request #
1289###############################
1290
1291
1292class _CSRMaker(_PKIObjMaker):
1293 """
1294 Metaclass for CSR creation. It is not necessary as it was for the keys,
1295 but we reuse the model instead of creating redundant constructors.
1296 """
1297
1298 def __call__(cls, cert_path):
1299 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CSR_SIZE)
1300 obj.__class__ = CSR
1301 try:
1302 # PKCS#10 format
1303 csr = PKCS10_CertificationRequest(obj._der)
1304 obj.marker = "NEW CERTIFICATE REQUEST"
1305 obj.fmt = CSR.FORMAT.PKCS10
1306 except Exception:
1307 try:
1308 # CMC format
1309 csr = CMS_ContentInfo(obj._der)
1310 obj.marker = "NEW CERTIFICATE REQUEST"
1311 obj.fmt = CSR.FORMAT.CMC
1312 except Exception:
1313 raise Exception("Unable to import CSR")
1314
1315 obj.import_from_asn1pkt(csr)
1316 return obj
1317
1318
1319class CSR(metaclass=_CSRMaker):
1320 """
1321 Wrapper for the CSR formats.
1322 This can handle both PKCS#10 and CMC formats.
1323 """
1324
1325 class FORMAT(enum.Enum):
1326 """
1327 The format used by the CSR.
1328 """
1329
1330 PKCS10 = "PKCS#10"
1331 CMC = "CMC"
1332
1333 def import_from_asn1pkt(self, csr):
1334 self.csr = csr
1335 certReqInfo = self.certReq.certificationRequestInfo
1336
1337 # Subject
1338 self.subject = certReqInfo.get_subject()
1339 self.subject_str = certReqInfo.get_subject_str()
1340 self.subject_hash = hash(self.subject_str)
1341
1342 # pubkey
1343 self.pubkey = PubKey(bytes(certReqInfo.subjectPublicKeyInfo))
1344
1345 # Get the "subjectKeyIdentifier" from the "extensionRequest" attribute
1346 try:
1347 extReq = next(
1348 x.values[0].value
1349 for x in certReqInfo.attributes
1350 if x.type.val == "1.2.840.113549.1.9.14" # extKeyUsage
1351 )
1352 self.sid = next(
1353 x.extnValue.keyIdentifier
1354 for x in extReq.extensions
1355 if x.extnID.val == "2.5.29.14" # subjectKeyIdentifier
1356 )
1357 except StopIteration:
1358 self.sid = None
1359
1360 @property
1361 def certReq(self):
1362 csr = self.csr
1363
1364 if self.fmt == CSR.FORMAT.PKCS10:
1365 return csr
1366 elif self.fmt == CSR.FORMAT.CMC:
1367 if (
1368 csr.contentType.oidname != "id-signedData"
1369 or csr.content.encapContentInfo.eContentType.oidname != "id-cct-PKIData"
1370 ):
1371 raise ValueError("Invalid CMC wrapping !")
1372 req = csr.content.encapContentInfo.eContent.reqSequence[0]
1373 return req.request.certificationRequest
1374 else:
1375 raise ValueError("Invalid CSR format !")
1376
1377 @property
1378 def pem(self):
1379 return der2pem(self.der, self.marker)
1380
1381 @property
1382 def der(self):
1383 return bytes(self.csr)
1384
1385 def __eq__(self, other):
1386 return self.der == other.der
1387
1388 def __hash__(self):
1389 return hash(self.der)
1390
1391 def isIssuer(self, other):
1392 return other.sid == self.sid
1393
1394 def isSelfSigned(self):
1395 return True
1396
1397 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
1398 return self.pubkey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L)
1399
1400 def export(self, filename, fmt=None):
1401 """
1402 Export certificate in 'fmt' format (DER or PEM) to file 'filename'
1403 """
1404 if fmt is None:
1405 if filename.endswith(".pem"):
1406 fmt = "PEM"
1407 else:
1408 fmt = "DER"
1409 with open(filename, "wb") as f:
1410 if fmt == "DER":
1411 return f.write(self.der)
1412 elif fmt == "PEM":
1413 return f.write(self.pem.encode())
1414
1415 def show(self):
1416 certReqInfo = self.certReq.certificationRequestInfo
1417
1418 print("Subject: " + self.subject_str)
1419 print("Attributes:")
1420 for attr in certReqInfo.attributes:
1421 print(" - %s" % attr.type.oidname)
1422
1423 def verifySelf(self) -> bool:
1424 """
1425 Verify the signatures of the CSR
1426 """
1427 if self.fmt == self.FORMAT.CMC:
1428 try:
1429 cms_engine = CMS_Engine([self])
1430 cms_engine.verify(self.csr)
1431 return self.pubkey.verifyCsr(self)
1432 except ValueError:
1433 return False
1434 elif self.fmt == self.FORMAT.PKCS10:
1435 return self.pubkey.verifyCsr(self)
1436 else:
1437 return False
1438
1439 def __repr__(self):
1440 return "[CSR Format: %s, Subject:%s, Verified: %s]" % (
1441 self.fmt.value,
1442 self.subject_str,
1443 self.verifySelf(),
1444 )
1445
1446
1447####################
1448# Certificate list #
1449####################
1450
1451
1452class CertList(list):
1453 """
1454 An object that can store a list of Cert objects, load them and export them
1455 into DER/PEM format.
1456 """
1457
1458 def __init__(
1459 self,
1460 certList: Union[Self, List[Cert], List[CSR], Cert, str],
1461 ):
1462 """
1463 Construct a list of certificates/CRLs to be used as list of ROOT certificates.
1464 """
1465 # Parse the certificate list / CA
1466 if isinstance(certList, str):
1467 # It's a path. First get the _PKIObj
1468 obj = _PKIObjMaker.__call__(
1469 CertList, certList, _MAX_CERT_SIZE, "CERTIFICATE"
1470 )
1471
1472 # Then parse the der until there's nothing left
1473 certList = []
1474 payload = obj._der
1475 while payload:
1476 cert = X509_Cert(payload)
1477 if conf.raw_layer in cert.payload:
1478 payload = cert.payload.load
1479 else:
1480 payload = None
1481 cert.remove_payload()
1482 certList.append(Cert(cert))
1483
1484 self.frmt = obj.frmt
1485 elif isinstance(certList, Cert):
1486 certList = [certList]
1487 self.frmt = "PEM"
1488 else:
1489 self.frmt = "PEM"
1490
1491 super(CertList, self).__init__(certList)
1492
1493 def findCertBySid(self, sid):
1494 """
1495 Find a certificate in the list by SubjectIDentifier.
1496 """
1497 for cert in self:
1498 if isinstance(cert, Cert) and isinstance(sid, CMS_IssuerAndSerialNumber):
1499 if cert.issuer == sid.get_issuer():
1500 return cert
1501 elif isinstance(cert, CSR) and isinstance(sid, CMS_SubjectKeyIdentifier):
1502 if cert.sid == sid.sid:
1503 return cert
1504 raise KeyError("Certificate not found !")
1505
1506 def export(self, filename, fmt=None):
1507 """
1508 Export a list of certificates 'fmt' format (DER or PEM) to file 'filename'
1509 """
1510 if fmt is None:
1511 if filename.endswith(".pem"):
1512 fmt = "PEM"
1513 else:
1514 fmt = "DER"
1515 with open(filename, "wb") as f:
1516 if fmt == "DER":
1517 return f.write(self.der)
1518 elif fmt == "PEM":
1519 return f.write(self.pem.encode())
1520
1521 @property
1522 def der(self):
1523 return b"".join(x.der for x in self)
1524
1525 @property
1526 def pem(self):
1527 return "".join(x.pem for x in self)
1528
1529 def __repr__(self):
1530 return "<CertList %s certificates>" % (len(self),)
1531
1532 def show(self):
1533 for i, c in enumerate(self):
1534 print(conf.color_theme.id(i, fmt="%04i"), end=" ")
1535 print(repr(c))
1536
1537
1538######################
1539# Certificate chains #
1540######################
1541
1542
1543class CertTree(CertList):
1544 """
1545 An extension to CertList that additionally has a list of ROOT CAs
1546 that are trusted.
1547
1548 Example::
1549
1550 >>> tree = CertTree("ca_chain.pem")
1551 >>> tree.show()
1552 /CN=DOMAIN-DC1-CA/dc=DOMAIN [Self Signed]
1553 /CN=Administrator/dc=DOMAIN [Not Self Signed]
1554 """
1555
1556 __slots__ = ["frmt", "rootCAs"]
1557
1558 def __init__(
1559 self,
1560 certList: Union[List[Cert], CertList, str],
1561 rootCAs: Union[List[Cert], CertList, Cert, str, None] = None,
1562 ):
1563 """
1564 Construct a chain of certificates that follows issuer/subject matching and
1565 respects signature validity.
1566
1567 Note that we do not check AKID/{SKID/issuer/serial} matching,
1568 nor the presence of keyCertSign in keyUsage extension (if present).
1569
1570 :param certList: a list of Cert/CRL objects (or path to PEM/DER file containing
1571 multiple certs/CRL) to try to chain.
1572 :param rootCAs: (optional) a list of certificates to trust. If not provided,
1573 trusts any self-signed certificates from the certList.
1574 """
1575 # Parse the certificate list
1576 certList = CertList(certList)
1577
1578 # Find the ROOT CAs if store isn't specified
1579 if not rootCAs:
1580 # Build cert store.
1581 self.rootCAs = CertList([x for x in certList if x.isSelfSigned()])
1582 # And remove those certs from the list
1583 for cert in self.rootCAs:
1584 certList.remove(cert)
1585 else:
1586 # Store cert store.
1587 self.rootCAs = CertList(rootCAs)
1588 # And remove those certs from the list if present (remove dups)
1589 for cert in self.rootCAs:
1590 if cert in certList:
1591 certList.remove(cert)
1592
1593 # Append our root CAs to the certList
1594 certList.extend(self.rootCAs)
1595
1596 # Super instantiate
1597 super(CertTree, self).__init__(certList)
1598
1599 @property
1600 def tree(self):
1601 """
1602 Get a tree-like object of the certificate list
1603 """
1604 # We store the tree object as a dictionary that contains children.
1605 tree = [(x, []) for x in self.rootCAs]
1606
1607 # We'll empty this list eventually
1608 certList = list(self)
1609
1610 # We make a list of certificates we have to search children for, and iterate
1611 # through it until it's empty.
1612 todo = list(tree)
1613
1614 # Iterate
1615 while todo:
1616 cert, children = todo.pop()
1617 for c in certList:
1618 # Check if this certificate matches the one we're looking at
1619 if c.isIssuer(cert) and c != cert:
1620 item = (c, [])
1621 children.append(item)
1622 certList.remove(c)
1623 todo.append(item)
1624
1625 return tree
1626
1627 def getchain(self, cert):
1628 """
1629 Return a chain of certificate that points from a ROOT CA to a certificate.
1630 """
1631
1632 def _rec_getchain(chain, curtree):
1633 # See if an element of the current tree signs the cert, if so add it to
1634 # the chain, else recurse.
1635 for c, subtree in curtree:
1636 curchain = chain + [c]
1637 # If 'cert' is issued by c
1638 if cert.isIssuer(c):
1639 # Final node of the chain !
1640 # (add the final cert if not self signed)
1641 if c != cert:
1642 curchain += [cert]
1643 return curchain
1644 else:
1645 # Not the final node of the chain ! Recurse.
1646 curchain = _rec_getchain(curchain, subtree)
1647 if curchain:
1648 return curchain
1649 return None
1650
1651 chain = _rec_getchain([], self.tree)
1652 if chain is not None:
1653 return CertTree(chain)
1654 else:
1655 return None
1656
1657 def verify(self, cert):
1658 """
1659 Verify that a certificate is properly signed.
1660 """
1661 # Check that we can find a chain to this certificate
1662 if not self.getchain(cert):
1663 raise ValueError("Certificate verification failed !")
1664
1665 def show(self, ret: bool = False):
1666 """
1667 Return the CertTree as a string certificate tree
1668 """
1669
1670 def _rec_show(c, children, lvl=0):
1671 s = ""
1672 # Process the current CA
1673 if c:
1674 if not c.isSelfSigned():
1675 s += "%s [Not Self Signed]\n" % c.subject_str
1676 else:
1677 s += "%s [Self Signed]\n" % c.subject_str
1678 s = lvl * " " + s
1679 lvl += 1
1680 # Process all sub-CAs at a lower level
1681 for child, subchildren in children:
1682 s += _rec_show(child, subchildren, lvl=lvl)
1683 return s
1684
1685 showed = _rec_show(None, self.tree)
1686 if ret:
1687 return showed
1688 else:
1689 print(showed)
1690
1691 def __repr__(self):
1692 return "<CertTree %s certificates (%s ROOT CA)>" % (
1693 len(self),
1694 len(self.rootCAs),
1695 )
1696
1697
1698#######
1699# CMS #
1700#######
1701
1702# RFC3852
1703
1704
1705class CMS_Engine:
1706 """
1707 A utility class to perform CMS/PKCS7 operations, as specified by RFC3852.
1708
1709 :param store: a ROOT CA certificate list to trust.
1710 :param crls: a list of CRLs to include. This is currently not checked.
1711 """
1712
1713 def __init__(
1714 self,
1715 store: CertList,
1716 crls: List[X509_CRL] = [],
1717 ):
1718 self.store = store
1719 self.crls = crls
1720
1721 def sign(
1722 self,
1723 message: Union[bytes, Packet],
1724 eContentType: ASN1_OID,
1725 cert: Cert,
1726 key: PrivKey,
1727 h: Optional[str] = None,
1728 ):
1729 """
1730 Sign a message using CMS.
1731
1732 :param message: the inner content to sign.
1733 :param eContentType: the OID of the inner content.
1734 :param cert: the certificate whose key to use use for signing.
1735 :param key: the private key to use for signing.
1736 :param h: the hash to use (default: same as the certificate's signature)
1737
1738 We currently only support X.509 certificates !
1739 """
1740 # RFC3852 - 5.4. Message Digest Calculation Process
1741 h = h or _get_cert_sig_hashname(cert)
1742 hash = hashes.Hash(_get_hash(h))
1743 hash.update(bytes(message))
1744 hashed_message = hash.finalize()
1745
1746 # 5.5. Signature Generation Process
1747 signerInfo = CMS_SignerInfo(
1748 version=1,
1749 sid=CMS_IssuerAndSerialNumber(
1750 issuer=cert.tbsCertificate.issuer,
1751 serialNumber=cert.tbsCertificate.serialNumber,
1752 ),
1753 digestAlgorithm=X509_AlgorithmIdentifier(
1754 algorithm=ASN1_OID(h),
1755 parameters=ASN1_NULL(0),
1756 ),
1757 signedAttrs=[
1758 X509_Attribute(
1759 type=ASN1_OID("contentType"),
1760 values=[
1761 X509_AttributeValue(value=eContentType),
1762 ],
1763 ),
1764 X509_Attribute(
1765 type=ASN1_OID("messageDigest"),
1766 # "A message-digest attribute MUST have a single attribute value"
1767 values=[
1768 X509_AttributeValue(value=ASN1_STRING(hashed_message)),
1769 ],
1770 ),
1771 ],
1772 signatureAlgorithm=cert.tbsCertificate.signature,
1773 )
1774 signerInfo.signature = ASN1_STRING(
1775 key.sign(
1776 bytes(
1777 CMS_SignedAttrsForSignature(
1778 signedAttrs=signerInfo.signedAttrs,
1779 )
1780 ),
1781 h=h,
1782 )
1783 )
1784
1785 # Build a chain of X509_Cert to ship (but skip the ROOT certificate)
1786 certTree = CertTree(cert, self.store)
1787 certificates = [x.x509Cert for x in certTree if not x.isSelfSigned()]
1788
1789 # Build final structure
1790 return CMS_ContentInfo(
1791 contentType=ASN1_OID("id-signedData"),
1792 content=CMS_SignedData(
1793 version=3 if certificates else 1,
1794 digestAlgorithms=X509_AlgorithmIdentifier(
1795 algorithm=ASN1_OID(h),
1796 parameters=ASN1_NULL(0),
1797 ),
1798 encapContentInfo=CMS_EncapsulatedContentInfo(
1799 eContentType=eContentType,
1800 eContent=message,
1801 ),
1802 certificates=(
1803 [CMS_CertificateChoices(certificate=cert) for cert in certificates]
1804 if certificates
1805 else None
1806 ),
1807 crls=(
1808 [CMS_RevocationInfoChoice(crl=crl) for crl in self.crls]
1809 if self.crls
1810 else None
1811 ),
1812 signerInfos=[
1813 signerInfo,
1814 ],
1815 ),
1816 )
1817
1818 def verify(
1819 self,
1820 contentInfo: CMS_ContentInfo,
1821 eContentType: Optional[ASN1_OID] = None,
1822 ):
1823 """
1824 Verify a CMS message against the list of trusted certificates,
1825 and return the unpacked message if the verification succeeds.
1826
1827 :param contentInfo: the ContentInfo whose signature to verify
1828 :param eContentType: if provided, verifies that the content type is valid
1829 """
1830 if contentInfo.contentType.oidname != "id-signedData":
1831 raise ValueError("ContentInfo isn't signed !")
1832
1833 signeddata = contentInfo.content
1834
1835 # Build the certificate chain
1836 certificates = []
1837 if signeddata.certificates:
1838 certificates = [Cert(x.certificate) for x in signeddata.certificates]
1839 certTree = CertTree(certificates, self.store)
1840
1841 # Check there's at least one signature
1842 if not signeddata.signerInfos:
1843 raise ValueError("ContentInfo contained no signature !")
1844
1845 # Check all signatures
1846 for signerInfo in signeddata.signerInfos:
1847 # Find certificate in the chain that did this
1848 cert: Cert = certTree.findCertBySid(signerInfo.sid)
1849
1850 # Verify certificate signature
1851 certTree.verify(cert)
1852
1853 # Verify the message hash
1854 if signerInfo.signedAttrs:
1855 # Verify the contentType
1856 try:
1857 contentType = next(
1858 x.values[0].value
1859 for x in signerInfo.signedAttrs
1860 if x.type.oidname == "contentType"
1861 )
1862
1863 if contentType != signeddata.encapContentInfo.eContentType:
1864 raise ValueError(
1865 "Inconsistent 'contentType' was detected in packet !"
1866 )
1867
1868 if eContentType is not None and eContentType != contentType:
1869 raise ValueError(
1870 "Expected '%s' but got '%s' contentType !"
1871 % (
1872 eContentType,
1873 contentType,
1874 )
1875 )
1876 except StopIteration:
1877 raise ValueError("Missing contentType in signedAttrs !")
1878
1879 # Verify the messageDigest value
1880 try:
1881 # "A message-digest attribute MUST have a single attribute value"
1882 messageDigest = next(
1883 x.values[0].value
1884 for x in signerInfo.signedAttrs
1885 if x.type.oidname == "messageDigest"
1886 )
1887
1888 # Re-calculate hash
1889 h = signerInfo.digestAlgorithm.algorithm.oidname
1890 hash = hashes.Hash(_get_hash(h))
1891 hash.update(bytes(signeddata.encapContentInfo.eContent))
1892 hashed_message = hash.finalize()
1893
1894 if hashed_message != messageDigest:
1895 raise ValueError("Invalid messageDigest value !")
1896 except StopIteration:
1897 raise ValueError("Missing messageDigest in signedAttrs !")
1898
1899 # Verify the signature
1900 cert.verify(
1901 msg=bytes(
1902 CMS_SignedAttrsForSignature(
1903 signedAttrs=signerInfo.signedAttrs,
1904 )
1905 ),
1906 sig=signerInfo.signature.val,
1907 )
1908 else:
1909 cert.verify(
1910 msg=bytes(signeddata.encapContentInfo),
1911 sig=signerInfo.signature.val,
1912 )
1913
1914 # Return the content
1915 return signeddata.encapContentInfo.eContent