1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net>
5# <arno@natisbad.org>
6# 2015, 2016, 2017 Maxence Tury <maxence.tury@ssi.gouv.fr>
7
8"""
9High-level methods for PKI objects (X.509 certificates, CRLs, asymmetric keys).
10Supports both RSA and ECDSA objects.
11
12The classes below are wrappers for the ASN.1 objects defined in x509.py.
13For instance, here is what you could do in order to modify the subject public
14key info of a 'cert' and then resign it with whatever 'key'::
15
16 from scapy.layers.tls.cert import *
17 cert = Cert("cert.der")
18 k = PrivKeyRSA() # generate a private key
19 cert.setSubjectPublicKeyFromPrivateKey(k)
20 cert.resignWith(k)
21 cert.export("newcert.pem")
22 k.export("mykey.pem")
23
24One could also edit arguments like the serial number, as such::
25
26 from scapy.layers.tls.cert import *
27 c = Cert("mycert.pem")
28 c.tbsCertificate.serialNumber = 0x4B1D
29 k = PrivKey("mykey.pem") # import an existing private key
30 c.resignWith(k)
31 c.export("newcert.pem")
32
33To export the public key of a private key::
34
35 k = PrivKey("mykey.pem")
36 k.pubkey.export("mypubkey.pem")
37
38No need for obnoxious openssl tweaking anymore. :)
39"""
40
41import base64
42import os
43import time
44
45from scapy.config import conf, crypto_validator
46from scapy.error import warning
47from scapy.utils import binrepr
48from scapy.asn1.asn1 import ASN1_BIT_STRING
49from scapy.asn1.mib import hash_by_oid
50from scapy.layers.x509 import (
51 ECDSAPrivateKey_OpenSSL,
52 ECDSAPrivateKey,
53 ECDSAPublicKey,
54 EdDSAPublicKey,
55 EdDSAPrivateKey,
56 RSAPrivateKey_OpenSSL,
57 RSAPrivateKey,
58 RSAPublicKey,
59 X509_Cert,
60 X509_CRL,
61 X509_SubjectPublicKeyInfo,
62)
63from scapy.layers.tls.crypto.pkcs1 import pkcs_os2ip, _get_hash, \
64 _EncryptAndVerifyRSA, _DecryptAndSignRSA
65from scapy.compat import raw, bytes_encode
66
67if conf.crypto_valid:
68 from cryptography.exceptions import InvalidSignature
69 from cryptography.hazmat.backends import default_backend
70 from cryptography.hazmat.primitives import serialization
71 from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519
72
73 # cryptography raised the minimum RSA key length to 1024 in 43.0+
74 # https://github.com/pyca/cryptography/pull/10278
75 # but we need still 512 for EXPORT40 ciphers (yes EXPORT is terrible)
76 # https://datatracker.ietf.org/doc/html/rfc2246#autoid-66
77 # The following detects the change and hacks around it using the backend
78
79 try:
80 rsa.generate_private_key(public_exponent=65537, key_size=512)
81 _RSA_512_SUPPORTED = True
82 except ValueError:
83 # cryptography > 43.0
84 _RSA_512_SUPPORTED = False
85 from cryptography.hazmat.primitives.asymmetric.rsa import rust_openssl
86
87
88# Maximum allowed size in bytes for a certificate file, to avoid
89# loading huge file when importing a cert
90_MAX_KEY_SIZE = 50 * 1024
91_MAX_CERT_SIZE = 50 * 1024
92_MAX_CRL_SIZE = 10 * 1024 * 1024 # some are that big
93
94
95#####################################################################
96# Some helpers
97#####################################################################
98
99@conf.commands.register
100def der2pem(der_string, obj="UNKNOWN"):
101 """Convert DER octet string to PEM format (with optional header)"""
102 # Encode a byte string in PEM format. Header advertises <obj> type.
103 pem_string = "-----BEGIN %s-----\n" % obj
104 base64_string = base64.b64encode(der_string).decode()
105 chunks = [base64_string[i:i + 64] for i in range(0, len(base64_string), 64)] # noqa: E501
106 pem_string += '\n'.join(chunks)
107 pem_string += "\n-----END %s-----\n" % obj
108 return pem_string
109
110
111@conf.commands.register
112def pem2der(pem_string):
113 """Convert PEM string to DER format"""
114 # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'.
115 pem_string = pem_string.replace(b"\r", b"")
116 first_idx = pem_string.find(b"-----\n") + 6
117 if pem_string.find(b"-----BEGIN", first_idx) != -1:
118 raise Exception("pem2der() expects only one PEM-encoded object")
119 last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----"))
120 base64_string = pem_string[first_idx:last_idx]
121 base64_string.replace(b"\n", b"")
122 der_string = base64.b64decode(base64_string)
123 return der_string
124
125
126def split_pem(s):
127 """
128 Split PEM objects. Useful to process concatenated certificates.
129 """
130 pem_strings = []
131 while s != b"":
132 start_idx = s.find(b"-----BEGIN")
133 if start_idx == -1:
134 break
135 end_idx = s.find(b"-----END")
136 if end_idx == -1:
137 raise Exception("Invalid PEM object (missing END tag)")
138 end_idx = s.find(b"\n", end_idx) + 1
139 if end_idx == 0:
140 # There is no final \n
141 end_idx = len(s)
142 pem_strings.append(s[start_idx:end_idx])
143 s = s[end_idx:]
144 return pem_strings
145
146
147class _PKIObj(object):
148 def __init__(self, frmt, der):
149 self.frmt = frmt
150 self._der = der
151
152
153class _PKIObjMaker(type):
154 def __call__(cls, obj_path, obj_max_size, pem_marker=None):
155 # This enables transparent DER and PEM-encoded data imports.
156 # Note that when importing a PEM file with multiple objects (like ECDSA
157 # private keys output by openssl), it will concatenate every object in
158 # order to create a 'der' attribute. When converting a 'multi' DER file
159 # into a PEM file, though, the PEM attribute will not be valid,
160 # because we do not try to identify the class of each object.
161 error_msg = "Unable to import data"
162
163 if obj_path is None:
164 raise Exception(error_msg)
165 obj_path = bytes_encode(obj_path)
166
167 if (b'\x00' not in obj_path) and os.path.isfile(obj_path):
168 _size = os.path.getsize(obj_path)
169 if _size > obj_max_size:
170 raise Exception(error_msg)
171 try:
172 with open(obj_path, "rb") as f:
173 _raw = f.read()
174 except Exception:
175 raise Exception(error_msg)
176 else:
177 _raw = obj_path
178
179 try:
180 if b"-----BEGIN" in _raw:
181 frmt = "PEM"
182 pem = _raw
183 der_list = split_pem(pem)
184 der = b''.join(map(pem2der, der_list))
185 else:
186 frmt = "DER"
187 der = _raw
188 except Exception:
189 raise Exception(error_msg)
190
191 p = _PKIObj(frmt, der)
192 return p
193
194
195#####################################################################
196# PKI objects wrappers
197#####################################################################
198
199###############
200# Public Keys #
201###############
202
203class _PubKeyFactory(_PKIObjMaker):
204 """
205 Metaclass for PubKey creation.
206 It casts the appropriate class on the fly, then fills in
207 the appropriate attributes with import_from_asn1pkt() submethod.
208 """
209 def __call__(cls, key_path=None, cryptography_obj=None):
210 # This allows to import cryptography objects directly
211 if cryptography_obj is not None:
212 obj = type.__call__(cls)
213 obj.__class__ = cls
214 obj.frmt = "original"
215 obj.marker = "PUBLIC KEY"
216 obj.pubkey = cryptography_obj
217 return obj
218
219 if key_path is None:
220 obj = type.__call__(cls)
221 if cls is PubKey:
222 cls = PubKeyRSA
223 obj.__class__ = cls
224 obj.frmt = "original"
225 obj.fill_and_store()
226 return obj
227
228 # This deals with the rare RSA 'kx export' call.
229 if isinstance(key_path, tuple):
230 obj = type.__call__(cls)
231 obj.__class__ = PubKeyRSA
232 obj.frmt = "tuple"
233 obj.import_from_tuple(key_path)
234 return obj
235
236 # Now for the usual calls, key_path may be the path to either:
237 # _an X509_SubjectPublicKeyInfo, as processed by openssl;
238 # _an RSAPublicKey;
239 # _an ECDSAPublicKey;
240 # _an EdDSAPublicKey.
241 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
242 try:
243 spki = X509_SubjectPublicKeyInfo(obj._der)
244 pubkey = spki.subjectPublicKey
245 if isinstance(pubkey, RSAPublicKey):
246 obj.__class__ = PubKeyRSA
247 obj.import_from_asn1pkt(pubkey)
248 elif isinstance(pubkey, ECDSAPublicKey):
249 obj.__class__ = PubKeyECDSA
250 obj.import_from_der(obj._der)
251 elif isinstance(pubkey, EdDSAPublicKey):
252 obj.__class__ = PubKeyEdDSA
253 obj.import_from_der(obj._der)
254 else:
255 raise
256 obj.marker = "PUBLIC KEY"
257 except Exception:
258 try:
259 pubkey = RSAPublicKey(obj._der)
260 obj.__class__ = PubKeyRSA
261 obj.import_from_asn1pkt(pubkey)
262 obj.marker = "RSA PUBLIC KEY"
263 except Exception:
264 # We cannot import an ECDSA public key without curve knowledge
265 if conf.debug_dissector:
266 raise
267 raise Exception("Unable to import public key")
268 return obj
269
270
271class PubKey(metaclass=_PubKeyFactory):
272 """
273 Parent class for PubKeyRSA, PubKeyECDSA and PubKeyEdDSA.
274 Provides common verifyCert() and export() methods.
275 """
276
277 def verifyCert(self, cert):
278 """ Verifies either a Cert or an X509_Cert. """
279 tbsCert = cert.tbsCertificate
280 sigAlg = tbsCert.signature
281 h = hash_by_oid[sigAlg.algorithm.val]
282 sigVal = raw(cert.signatureValue)
283 return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs')
284
285 @property
286 def pem(self):
287 return der2pem(self.der, self.marker)
288
289 @property
290 def der(self):
291 return self.pubkey.public_bytes(
292 encoding=serialization.Encoding.DER,
293 format=serialization.PublicFormat.SubjectPublicKeyInfo,
294 )
295
296 def public_numbers(self, *args, **kwargs):
297 return self.pubkey.public_numbers(*args, **kwargs)
298
299 @property
300 def key_size(self):
301 return self.pubkey.key_size
302
303 def export(self, filename, fmt=None):
304 """
305 Export public key in 'fmt' format (DER or PEM) to file 'filename'
306 """
307 if fmt is None:
308 if filename.endswith(".pem"):
309 fmt = "PEM"
310 else:
311 fmt = "DER"
312 with open(filename, "wb") as f:
313 if fmt == "DER":
314 return f.write(self.der)
315 elif fmt == "PEM":
316 return f.write(self.pem.encode())
317
318
319class PubKeyRSA(PubKey, _EncryptAndVerifyRSA):
320 """
321 Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py
322 Use the 'key' attribute to access original object.
323 """
324 @crypto_validator
325 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None):
326 pubExp = pubExp or 65537
327 if not modulus:
328 real_modulusLen = modulusLen or 2048
329 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED:
330 # cryptography > 43.0 compatibility
331 private_key = rust_openssl.rsa.generate_private_key(
332 public_exponent=pubExp,
333 key_size=real_modulusLen,
334 )
335 else:
336 private_key = rsa.generate_private_key(
337 public_exponent=pubExp,
338 key_size=real_modulusLen,
339 backend=default_backend(),
340 )
341 self.pubkey = private_key.public_key()
342 else:
343 real_modulusLen = len(binrepr(modulus))
344 if modulusLen and real_modulusLen != modulusLen:
345 warning("modulus and modulusLen do not match!")
346 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
347 self.pubkey = pubNum.public_key(default_backend())
348
349 self.marker = "PUBLIC KEY"
350
351 # Lines below are only useful for the legacy part of pkcs1.py
352 pubNum = self.pubkey.public_numbers()
353 self._modulusLen = real_modulusLen
354 self._modulus = pubNum.n
355 self._pubExp = pubNum.e
356
357 @crypto_validator
358 def import_from_tuple(self, tup):
359 # this is rarely used
360 e, m, mLen = tup
361 if isinstance(m, bytes):
362 m = pkcs_os2ip(m)
363 if isinstance(e, bytes):
364 e = pkcs_os2ip(e)
365 self.fill_and_store(modulus=m, pubExp=e)
366
367 def import_from_asn1pkt(self, pubkey):
368 modulus = pubkey.modulus.val
369 pubExp = pubkey.publicExponent.val
370 self.fill_and_store(modulus=modulus, pubExp=pubExp)
371
372 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
373 # no ECDSA encryption support, hence no ECDSA specific keywords here
374 return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L)
375
376 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
377 return _EncryptAndVerifyRSA.verify(
378 self, msg, sig, t=t, h=h, mgf=mgf, L=L)
379
380
381class PubKeyECDSA(PubKey):
382 """
383 Wrapper for ECDSA keys based on the cryptography library.
384 Use the 'key' attribute to access original object.
385 """
386 @crypto_validator
387 def fill_and_store(self, curve=None):
388 curve = curve or ec.SECP256R1
389 private_key = ec.generate_private_key(curve(), default_backend())
390 self.pubkey = private_key.public_key()
391
392 @crypto_validator
393 def import_from_der(self, pubkey):
394 # No lib support for explicit curves nor compressed points.
395 self.pubkey = serialization.load_der_public_key(
396 pubkey,
397 backend=default_backend(),
398 )
399
400 def encrypt(self, msg, h="sha256", **kwargs):
401 raise Exception("No ECDSA encryption support")
402
403 @crypto_validator
404 def verify(self, msg, sig, h="sha256", **kwargs):
405 # 'sig' should be a DER-encoded signature, as per RFC 3279
406 try:
407 self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h)))
408 return True
409 except InvalidSignature:
410 return False
411
412
413class PubKeyEdDSA(PubKey):
414 """
415 Wrapper for EdDSA keys based on the cryptography library.
416 Use the 'key' attribute to access original object.
417 """
418 @crypto_validator
419 def fill_and_store(self, curve=None):
420 curve = curve or x25519.X25519PrivateKey
421 private_key = curve.generate()
422 self.pubkey = private_key.public_key()
423
424 @crypto_validator
425 def import_from_der(self, pubkey):
426 self.pubkey = serialization.load_der_public_key(
427 pubkey,
428 backend=default_backend(),
429 )
430
431 def encrypt(self, msg, **kwargs):
432 raise Exception("No EdDSA encryption support")
433
434 @crypto_validator
435 def verify(self, msg, sig, **kwargs):
436 # 'sig' should be a DER-encoded signature, as per RFC 3279
437 try:
438 self.pubkey.verify(sig, msg)
439 return True
440 except InvalidSignature:
441 return False
442
443
444################
445# Private Keys #
446################
447
448class _PrivKeyFactory(_PKIObjMaker):
449 """
450 Metaclass for PrivKey creation.
451 It casts the appropriate class on the fly, then fills in
452 the appropriate attributes with import_from_asn1pkt() submethod.
453 """
454 def __call__(cls, key_path=None, cryptography_obj=None):
455 """
456 key_path may be the path to either:
457 _an RSAPrivateKey_OpenSSL (as generated by openssl);
458 _an ECDSAPrivateKey_OpenSSL (as generated by openssl);
459 _an RSAPrivateKey;
460 _an ECDSAPrivateKey.
461 """
462 if key_path is None:
463 obj = type.__call__(cls)
464 if cls is PrivKey:
465 cls = PrivKeyECDSA
466 obj.__class__ = cls
467 obj.frmt = "original"
468 obj.fill_and_store()
469 return obj
470
471 # This allows to import cryptography objects directly
472 if cryptography_obj is not None:
473 # We (stupidly) need to go through the whole import process because RSA
474 # does more than just importing the cryptography objects...
475 obj = _PKIObj("DER", cryptography_obj.private_bytes(
476 encoding=serialization.Encoding.DER,
477 format=serialization.PrivateFormat.PKCS8,
478 encryption_algorithm=serialization.NoEncryption()
479 ))
480 else:
481 # Load from file
482 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
483
484 try:
485 privkey = RSAPrivateKey_OpenSSL(obj._der)
486 privkey = privkey.privateKey
487 obj.__class__ = PrivKeyRSA
488 obj.marker = "PRIVATE KEY"
489 except Exception:
490 try:
491 privkey = ECDSAPrivateKey_OpenSSL(obj._der)
492 privkey = privkey.privateKey
493 obj.__class__ = PrivKeyECDSA
494 obj.marker = "EC PRIVATE KEY"
495 except Exception:
496 try:
497 privkey = RSAPrivateKey(obj._der)
498 obj.__class__ = PrivKeyRSA
499 obj.marker = "RSA PRIVATE KEY"
500 except Exception:
501 try:
502 privkey = ECDSAPrivateKey(obj._der)
503 obj.__class__ = PrivKeyECDSA
504 obj.marker = "EC PRIVATE KEY"
505 except Exception:
506 try:
507 privkey = EdDSAPrivateKey(obj._der)
508 obj.__class__ = PrivKeyEdDSA
509 obj.marker = "PRIVATE KEY"
510 except Exception:
511 raise Exception("Unable to import private key")
512 try:
513 obj.import_from_asn1pkt(privkey)
514 except ImportError:
515 pass
516 return obj
517
518
519class _Raw_ASN1_BIT_STRING(ASN1_BIT_STRING):
520 """A ASN1_BIT_STRING that ignores BER encoding"""
521 def __bytes__(self):
522 return self.val_readable
523 __str__ = __bytes__
524
525
526class PrivKey(metaclass=_PrivKeyFactory):
527 """
528 Parent class for PrivKeyRSA, PrivKeyECDSA and PrivKeyEdDSA.
529 Provides common signTBSCert(), resignCert(), verifyCert()
530 and export() methods.
531 """
532
533 def signTBSCert(self, tbsCert, h="sha256"):
534 """
535 Note that this will always copy the signature field from the
536 tbsCertificate into the signatureAlgorithm field of the result,
537 regardless of the coherence between its contents (which might
538 indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2).
539
540 There is a small inheritance trick for the computation of sigVal
541 below: in order to use a sign() method which would apply
542 to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the
543 subclasses accept any argument, be it from the RSA or ECDSA world,
544 and then they keep the ones they're interested in.
545 Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign().
546 """
547 sigAlg = tbsCert.signature
548 h = h or hash_by_oid[sigAlg.algorithm.val]
549 sigVal = self.sign(raw(tbsCert), h=h, t='pkcs')
550 c = X509_Cert()
551 c.tbsCertificate = tbsCert
552 c.signatureAlgorithm = sigAlg
553 c.signatureValue = _Raw_ASN1_BIT_STRING(sigVal, readable=True)
554 return c
555
556 def resignCert(self, cert):
557 """ Rewrite the signature of either a Cert or an X509_Cert. """
558 return self.signTBSCert(cert.tbsCertificate, h=None)
559
560 def verifyCert(self, cert):
561 """ Verifies either a Cert or an X509_Cert. """
562 tbsCert = cert.tbsCertificate
563 sigAlg = tbsCert.signature
564 h = hash_by_oid[sigAlg.algorithm.val]
565 sigVal = raw(cert.signatureValue)
566 return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs')
567
568 @property
569 def pem(self):
570 return der2pem(self.der, self.marker)
571
572 @property
573 def der(self):
574 return self.key.private_bytes(
575 encoding=serialization.Encoding.DER,
576 format=serialization.PrivateFormat.PKCS8,
577 encryption_algorithm=serialization.NoEncryption()
578 )
579
580 def export(self, filename, fmt=None):
581 """
582 Export private key in 'fmt' format (DER or PEM) to file 'filename'
583 """
584 if fmt is None:
585 if filename.endswith(".pem"):
586 fmt = "PEM"
587 else:
588 fmt = "DER"
589 with open(filename, "wb") as f:
590 if fmt == "DER":
591 return f.write(self.der)
592 elif fmt == "PEM":
593 return f.write(self.pem.encode())
594
595
596class PrivKeyRSA(PrivKey, _DecryptAndSignRSA):
597 """
598 Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py
599 Use the 'key' attribute to access original object.
600 """
601 @crypto_validator
602 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None,
603 prime1=None, prime2=None, coefficient=None,
604 exponent1=None, exponent2=None, privExp=None):
605 pubExp = pubExp or 65537
606 if None in [modulus, prime1, prime2, coefficient, privExp,
607 exponent1, exponent2]:
608 # note that the library requires every parameter
609 # in order to call RSAPrivateNumbers(...)
610 # if one of these is missing, we generate a whole new key
611 real_modulusLen = modulusLen or 2048
612 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED:
613 # cryptography > 43.0 compatibility
614 self.key = rust_openssl.rsa.generate_private_key(
615 public_exponent=pubExp,
616 key_size=real_modulusLen,
617 )
618 else:
619 self.key = rsa.generate_private_key(
620 public_exponent=pubExp,
621 key_size=real_modulusLen,
622 backend=default_backend(),
623 )
624 pubkey = self.key.public_key()
625 else:
626 real_modulusLen = len(binrepr(modulus))
627 if modulusLen and real_modulusLen != modulusLen:
628 warning("modulus and modulusLen do not match!")
629 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
630 privNum = rsa.RSAPrivateNumbers(p=prime1, q=prime2,
631 dmp1=exponent1, dmq1=exponent2,
632 iqmp=coefficient, d=privExp,
633 public_numbers=pubNum)
634 self.key = privNum.private_key(default_backend())
635 pubkey = self.key.public_key()
636
637 self.marker = "PRIVATE KEY"
638
639 # Lines below are only useful for the legacy part of pkcs1.py
640 pubNum = pubkey.public_numbers()
641 self._modulusLen = real_modulusLen
642 self._modulus = pubNum.n
643 self._pubExp = pubNum.e
644
645 self.pubkey = PubKeyRSA((pubNum.e, pubNum.n, real_modulusLen))
646
647 def import_from_asn1pkt(self, privkey):
648 modulus = privkey.modulus.val
649 pubExp = privkey.publicExponent.val
650 privExp = privkey.privateExponent.val
651 prime1 = privkey.prime1.val
652 prime2 = privkey.prime2.val
653 exponent1 = privkey.exponent1.val
654 exponent2 = privkey.exponent2.val
655 coefficient = privkey.coefficient.val
656 self.fill_and_store(modulus=modulus, pubExp=pubExp,
657 privExp=privExp, prime1=prime1, prime2=prime2,
658 exponent1=exponent1, exponent2=exponent2,
659 coefficient=coefficient)
660
661 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
662 return self.pubkey.verify(
663 msg=msg,
664 sig=sig,
665 t=t,
666 h=h,
667 mgf=mgf,
668 L=L,
669 )
670
671 def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None):
672 return _DecryptAndSignRSA.sign(self, data, t=t, h=h, mgf=mgf, L=L)
673
674
675class PrivKeyECDSA(PrivKey):
676 """
677 Wrapper for ECDSA keys based on SigningKey from ecdsa library.
678 Use the 'key' attribute to access original object.
679 """
680 @crypto_validator
681 def fill_and_store(self, curve=None):
682 curve = curve or ec.SECP256R1
683 self.key = ec.generate_private_key(curve(), default_backend())
684 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
685 self.marker = "EC PRIVATE KEY"
686
687 @crypto_validator
688 def import_from_asn1pkt(self, privkey):
689 self.key = serialization.load_der_private_key(raw(privkey), None,
690 backend=default_backend()) # noqa: E501
691 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
692 self.marker = "EC PRIVATE KEY"
693
694 @crypto_validator
695 def verify(self, msg, sig, h="sha256", **kwargs):
696 return self.pubkey.verify(msg=msg, sig=sig, h=h, **kwargs)
697
698 @crypto_validator
699 def sign(self, data, h="sha256", **kwargs):
700 return self.key.sign(data, ec.ECDSA(_get_hash(h)))
701
702
703class PrivKeyEdDSA(PrivKey):
704 """
705 Wrapper for EdDSA keys
706 Use the 'key' attribute to access original object.
707 """
708 @crypto_validator
709 def fill_and_store(self, curve=None):
710 curve = curve or x25519.X25519PrivateKey
711 self.key = curve.generate()
712 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
713 self.marker = "PRIVATE KEY"
714
715 @crypto_validator
716 def import_from_asn1pkt(self, privkey):
717 self.key = serialization.load_der_private_key(raw(privkey), None,
718 backend=default_backend()) # noqa: E501
719 self.pubkey = PubKeyECDSA(cryptography_obj=self.key.public_key())
720 self.marker = "PRIVATE KEY"
721
722 @crypto_validator
723 def verify(self, msg, sig, **kwargs):
724 return self.pubkey.verify(msg=msg, sig=sig, **kwargs)
725
726 @crypto_validator
727 def sign(self, data, **kwargs):
728 return self.key.sign(data)
729
730
731################
732# Certificates #
733################
734
735class _CertMaker(_PKIObjMaker):
736 """
737 Metaclass for Cert creation. It is not necessary as it was for the keys,
738 but we reuse the model instead of creating redundant constructors.
739 """
740 def __call__(cls, cert_path=None, cryptography_obj=None):
741 # This allows to import cryptography objects directly
742 if cryptography_obj is not None:
743 obj = _PKIObj("DER", cryptography_obj.public_bytes(
744 encoding=serialization.Encoding.DER,
745 ))
746 else:
747 # Load from file
748 obj = _PKIObjMaker.__call__(cls, cert_path,
749 _MAX_CERT_SIZE, "CERTIFICATE")
750 obj.__class__ = Cert
751 obj.marker = "CERTIFICATE"
752 try:
753 cert = X509_Cert(obj._der)
754 except Exception:
755 if conf.debug_dissector:
756 raise
757 raise Exception("Unable to import certificate")
758 obj.import_from_asn1pkt(cert)
759 return obj
760
761
762class Cert(metaclass=_CertMaker):
763 """
764 Wrapper for the X509_Cert from layers/x509.py.
765 Use the 'x509Cert' attribute to access original object.
766 """
767
768 def import_from_asn1pkt(self, cert):
769 error_msg = "Unable to import certificate"
770
771 self.x509Cert = cert
772
773 tbsCert = cert.tbsCertificate
774 self.tbsCertificate = tbsCert
775
776 if tbsCert.version:
777 self.version = tbsCert.version.val + 1
778 else:
779 self.version = 1
780 self.serial = tbsCert.serialNumber.val
781 self.sigAlg = tbsCert.signature.algorithm.oidname
782 self.issuer = tbsCert.get_issuer()
783 self.issuer_str = tbsCert.get_issuer_str()
784 self.issuer_hash = hash(self.issuer_str)
785 self.subject = tbsCert.get_subject()
786 self.subject_str = tbsCert.get_subject_str()
787 self.subject_hash = hash(self.subject_str)
788 self.authorityKeyID = None
789
790 self.notBefore_str = tbsCert.validity.not_before.pretty_time
791 try:
792 self.notBefore = tbsCert.validity.not_before.datetime.timetuple()
793 except ValueError:
794 raise Exception(error_msg)
795 self.notBefore_str_simple = time.strftime("%x", self.notBefore)
796
797 self.notAfter_str = tbsCert.validity.not_after.pretty_time
798 try:
799 self.notAfter = tbsCert.validity.not_after.datetime.timetuple()
800 except ValueError:
801 raise Exception(error_msg)
802 self.notAfter_str_simple = time.strftime("%x", self.notAfter)
803
804 self.pubKey = PubKey(raw(tbsCert.subjectPublicKeyInfo))
805
806 if tbsCert.extensions:
807 for extn in tbsCert.extensions:
808 if extn.extnID.oidname == "basicConstraints":
809 self.cA = False
810 if extn.extnValue.cA:
811 self.cA = not (extn.extnValue.cA.val == 0)
812 elif extn.extnID.oidname == "keyUsage":
813 self.keyUsage = extn.extnValue.get_keyUsage()
814 elif extn.extnID.oidname == "extKeyUsage":
815 self.extKeyUsage = extn.extnValue.get_extendedKeyUsage()
816 elif extn.extnID.oidname == "authorityKeyIdentifier":
817 self.authorityKeyID = extn.extnValue.keyIdentifier.val
818
819 self.signatureValue = raw(cert.signatureValue)
820 self.signatureLen = len(self.signatureValue)
821
822 def isIssuerCert(self, other):
823 """
824 True if 'other' issued 'self', i.e.:
825 - self.issuer == other.subject
826 - self is signed by other
827 """
828 if self.issuer_hash != other.subject_hash:
829 return False
830 return other.pubKey.verifyCert(self)
831
832 def isSelfSigned(self):
833 """
834 Return True if the certificate is self-signed:
835 - issuer and subject are the same
836 - the signature of the certificate is valid.
837 """
838 if self.issuer_hash == self.subject_hash:
839 return self.isIssuerCert(self)
840 return False
841
842 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
843 # no ECDSA *encryption* support, hence only RSA specific keywords here
844 return self.pubKey.encrypt(msg, t=t, h=h, mgf=mgf, L=L)
845
846 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
847 return self.pubKey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L)
848
849 def getSignatureHash(self):
850 """
851 Return the hash used by the 'signatureAlgorithm'
852 """
853 tbsCert = self.tbsCertificate
854 sigAlg = tbsCert.signature
855 h = hash_by_oid[sigAlg.algorithm.val]
856 return _get_hash(h)
857
858 def setSubjectPublicKeyFromPrivateKey(self, key):
859 """
860 Replace the subjectPublicKeyInfo of this certificate with the one from
861 the provided key.
862 """
863 if isinstance(key, (PubKey, PrivKey)):
864 if isinstance(key, PrivKey):
865 pubkey = key.pubkey
866 else:
867 pubkey = key
868 self.tbsCertificate.subjectPublicKeyInfo = X509_SubjectPublicKeyInfo(
869 pubkey.der
870 )
871 else:
872 raise ValueError("Unknown type 'key', should be PubKey or PrivKey")
873
874 def resignWith(self, key):
875 """
876 Resign a certificate with a specific key
877 """
878 self.import_from_asn1pkt(key.resignCert(self))
879
880 def remainingDays(self, now=None):
881 """
882 Based on the value of notAfter field, returns the number of
883 days the certificate will still be valid. The date used for the
884 comparison is the current and local date, as returned by
885 time.localtime(), except if 'now' argument is provided another
886 one. 'now' argument can be given as either a time tuple or a string
887 representing the date. Accepted format for the string version
888 are:
889
890 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT'
891 - '%m/%d/%y' e.g. '01/30/08' (less precise)
892
893 If the certificate is no more valid at the date considered, then
894 a negative value is returned representing the number of days
895 since it has expired.
896
897 The number of days is returned as a float to deal with the unlikely
898 case of certificates that are still just valid.
899 """
900 if now is None:
901 now = time.localtime()
902 elif isinstance(now, str):
903 try:
904 if '/' in now:
905 now = time.strptime(now, '%m/%d/%y')
906 else:
907 now = time.strptime(now, '%b %d %H:%M:%S %Y %Z')
908 except Exception:
909 warning("Bad time string provided, will use localtime() instead.") # noqa: E501
910 now = time.localtime()
911
912 now = time.mktime(now)
913 nft = time.mktime(self.notAfter)
914 diff = (nft - now) / (24. * 3600)
915 return diff
916
917 def isRevoked(self, crl_list):
918 """
919 Given a list of trusted CRL (their signature has already been
920 verified with trusted anchors), this function returns True if
921 the certificate is marked as revoked by one of those CRL.
922
923 Note that if the Certificate was on hold in a previous CRL and
924 is now valid again in a new CRL and bot are in the list, it
925 will be considered revoked: this is because _all_ CRLs are
926 checked (not only the freshest) and revocation status is not
927 handled.
928
929 Also note that the check on the issuer is performed on the
930 Authority Key Identifier if available in _both_ the CRL and the
931 Cert. Otherwise, the issuers are simply compared.
932 """
933 for c in crl_list:
934 if (self.authorityKeyID is not None and
935 c.authorityKeyID is not None and
936 self.authorityKeyID == c.authorityKeyID):
937 return self.serial in (x[0] for x in c.revoked_cert_serials)
938 elif self.issuer == c.issuer:
939 return self.serial in (x[0] for x in c.revoked_cert_serials)
940 return False
941
942 @property
943 def pem(self):
944 return der2pem(self.der, self.marker)
945
946 @property
947 def der(self):
948 return bytes(self.x509Cert)
949
950 def export(self, filename, fmt=None):
951 """
952 Export certificate in 'fmt' format (DER or PEM) to file 'filename'
953 """
954 if fmt is None:
955 if filename.endswith(".pem"):
956 fmt = "PEM"
957 else:
958 fmt = "DER"
959 with open(filename, "wb") as f:
960 if fmt == "DER":
961 return f.write(self.der)
962 elif fmt == "PEM":
963 return f.write(self.pem.encode())
964
965 def show(self):
966 print("Serial: %s" % self.serial)
967 print("Issuer: " + self.issuer_str)
968 print("Subject: " + self.subject_str)
969 print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str))
970
971 def __repr__(self):
972 return "[X.509 Cert. Subject:%s, Issuer:%s]" % (self.subject_str, self.issuer_str) # noqa: E501
973
974
975################################
976# Certificate Revocation Lists #
977################################
978
979class _CRLMaker(_PKIObjMaker):
980 """
981 Metaclass for CRL creation. It is not necessary as it was for the keys,
982 but we reuse the model instead of creating redundant constructors.
983 """
984 def __call__(cls, cert_path):
985 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL")
986 obj.__class__ = CRL
987 try:
988 crl = X509_CRL(obj._der)
989 except Exception:
990 raise Exception("Unable to import CRL")
991 obj.import_from_asn1pkt(crl)
992 return obj
993
994
995class CRL(metaclass=_CRLMaker):
996 """
997 Wrapper for the X509_CRL from layers/x509.py.
998 Use the 'x509CRL' attribute to access original object.
999 """
1000
1001 def import_from_asn1pkt(self, crl):
1002 error_msg = "Unable to import CRL"
1003
1004 self.x509CRL = crl
1005
1006 tbsCertList = crl.tbsCertList
1007 self.tbsCertList = raw(tbsCertList)
1008
1009 if tbsCertList.version:
1010 self.version = tbsCertList.version.val + 1
1011 else:
1012 self.version = 1
1013 self.sigAlg = tbsCertList.signature.algorithm.oidname
1014 self.issuer = tbsCertList.get_issuer()
1015 self.issuer_str = tbsCertList.get_issuer_str()
1016 self.issuer_hash = hash(self.issuer_str)
1017
1018 self.lastUpdate_str = tbsCertList.this_update.pretty_time
1019 lastUpdate = tbsCertList.this_update.val
1020 if lastUpdate[-1] == "Z":
1021 lastUpdate = lastUpdate[:-1]
1022 try:
1023 self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S")
1024 except Exception:
1025 raise Exception(error_msg)
1026 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate)
1027
1028 self.nextUpdate = None
1029 self.nextUpdate_str_simple = None
1030 if tbsCertList.next_update:
1031 self.nextUpdate_str = tbsCertList.next_update.pretty_time
1032 nextUpdate = tbsCertList.next_update.val
1033 if nextUpdate[-1] == "Z":
1034 nextUpdate = nextUpdate[:-1]
1035 try:
1036 self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S")
1037 except Exception:
1038 raise Exception(error_msg)
1039 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate)
1040
1041 if tbsCertList.crlExtensions:
1042 for extension in tbsCertList.crlExtensions:
1043 if extension.extnID.oidname == "cRLNumber":
1044 self.number = extension.extnValue.cRLNumber.val
1045
1046 revoked = []
1047 if tbsCertList.revokedCertificates:
1048 for cert in tbsCertList.revokedCertificates:
1049 serial = cert.serialNumber.val
1050 date = cert.revocationDate.val
1051 if date[-1] == "Z":
1052 date = date[:-1]
1053 try:
1054 time.strptime(date, "%y%m%d%H%M%S")
1055 except Exception:
1056 raise Exception(error_msg)
1057 revoked.append((serial, date))
1058 self.revoked_cert_serials = revoked
1059
1060 self.signatureValue = raw(crl.signatureValue)
1061 self.signatureLen = len(self.signatureValue)
1062
1063 def isIssuerCert(self, other):
1064 # This is exactly the same thing as in Cert method.
1065 if self.issuer_hash != other.subject_hash:
1066 return False
1067 return other.pubKey.verifyCert(self)
1068
1069 def verify(self, anchors):
1070 # Return True iff the CRL is signed by one of the provided anchors.
1071 return any(self.isIssuerCert(a) for a in anchors)
1072
1073 def show(self):
1074 print("Version: %d" % self.version)
1075 print("sigAlg: " + self.sigAlg)
1076 print("Issuer: " + self.issuer_str)
1077 print("lastUpdate: %s" % self.lastUpdate_str)
1078 print("nextUpdate: %s" % self.nextUpdate_str)
1079
1080
1081######################
1082# Certificate chains #
1083######################
1084
1085class Chain(list):
1086 """
1087 Basically, an enhanced array of Cert.
1088 """
1089
1090 def __init__(self, certList, cert0=None):
1091 """
1092 Construct a chain of certificates starting with a self-signed
1093 certificate (or any certificate submitted by the user)
1094 and following issuer/subject matching and signature validity.
1095 If there is exactly one chain to be constructed, it will be,
1096 but if there are multiple potential chains, there is no guarantee
1097 that the retained one will be the longest one.
1098 As Cert and CRL classes both share an isIssuerCert() method,
1099 the trailing element of a Chain may alternatively be a CRL.
1100
1101 Note that we do not check AKID/{SKID/issuer/serial} matching,
1102 nor the presence of keyCertSign in keyUsage extension (if present).
1103 """
1104 list.__init__(self, ())
1105 if cert0:
1106 self.append(cert0)
1107 else:
1108 for root_candidate in certList:
1109 if root_candidate.isSelfSigned():
1110 self.append(root_candidate)
1111 certList.remove(root_candidate)
1112 break
1113
1114 if len(self) > 0:
1115 while certList:
1116 tmp_len = len(self)
1117 for c in certList:
1118 if c.isIssuerCert(self[-1]):
1119 self.append(c)
1120 certList.remove(c)
1121 break
1122 if len(self) == tmp_len:
1123 # no new certificate appended to self
1124 break
1125
1126 def verifyChain(self, anchors, untrusted=None):
1127 """
1128 Perform verification of certificate chains for that certificate.
1129 A list of anchors is required. The certificates in the optional
1130 untrusted list may be used as additional elements to the final chain.
1131 On par with chain instantiation, only one chain constructed with the
1132 untrusted candidates will be retained. Eventually, dates are checked.
1133 """
1134 untrusted = untrusted or []
1135 for a in anchors:
1136 chain = Chain(self + untrusted, a)
1137 if len(chain) == 1: # anchor only
1138 continue
1139 # check that the chain does not exclusively rely on untrusted
1140 if any(c in chain[1:] for c in self):
1141 for c in chain:
1142 if c.remainingDays() < 0:
1143 break
1144 if c is chain[-1]: # we got to the end of the chain
1145 return chain
1146 return None
1147
1148 def verifyChainFromCAFile(self, cafile, untrusted_file=None):
1149 """
1150 Does the same job as .verifyChain() but using the list of anchors
1151 from the cafile. As for .verifyChain(), a list of untrusted
1152 certificates can be passed (as a file, this time).
1153 """
1154 try:
1155 with open(cafile, "rb") as f:
1156 ca_certs = f.read()
1157 except Exception:
1158 raise Exception("Could not read from cafile")
1159
1160 anchors = [Cert(c) for c in split_pem(ca_certs)]
1161
1162 untrusted = None
1163 if untrusted_file:
1164 try:
1165 with open(untrusted_file, "rb") as f:
1166 untrusted_certs = f.read()
1167 except Exception:
1168 raise Exception("Could not read from untrusted_file")
1169 untrusted = [Cert(c) for c in split_pem(untrusted_certs)]
1170
1171 return self.verifyChain(anchors, untrusted)
1172
1173 def verifyChainFromCAPath(self, capath, untrusted_file=None):
1174 """
1175 Does the same job as .verifyChainFromCAFile() but using the list
1176 of anchors in capath directory. The directory should (only) contain
1177 certificates files in PEM format. As for .verifyChainFromCAFile(),
1178 a list of untrusted certificates can be passed as a file
1179 (concatenation of the certificates in PEM format).
1180 """
1181 try:
1182 anchors = []
1183 for cafile in os.listdir(capath):
1184 with open(os.path.join(capath, cafile), "rb") as fd:
1185 anchors.append(Cert(fd.read()))
1186 except Exception:
1187 raise Exception("capath provided is not a valid cert path")
1188
1189 untrusted = None
1190 if untrusted_file:
1191 try:
1192 with open(untrusted_file, "rb") as f:
1193 untrusted_certs = f.read()
1194 except Exception:
1195 raise Exception("Could not read from untrusted_file")
1196 untrusted = [Cert(c) for c in split_pem(untrusted_certs)]
1197
1198 return self.verifyChain(anchors, untrusted)
1199
1200 def __repr__(self):
1201 llen = len(self) - 1
1202 if llen < 0:
1203 return ""
1204 c = self[0]
1205 s = "__ "
1206 if not c.isSelfSigned():
1207 s += "%s [Not Self Signed]\n" % c.subject_str
1208 else:
1209 s += "%s [Self Signed]\n" % c.subject_str
1210 idx = 1
1211 while idx <= llen:
1212 c = self[idx]
1213 s += "%s_ %s" % (" " * idx * 2, c.subject_str)
1214 if idx != llen:
1215 s += "\n"
1216 idx += 1
1217 return s