Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/oscrypto/_asymmetric.py: 47%
362 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:25 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:25 +0000
1# coding: utf-8
2from __future__ import unicode_literals, division, absolute_import, print_function
4import hashlib
5import hmac
6import re
7import binascii
9from ._asn1 import (
10 CertBag,
11 Certificate,
12 DSAPrivateKey,
13 ECPrivateKey,
14 EncryptedData,
15 EncryptedPrivateKeyInfo,
16 Integer,
17 OctetString,
18 Pfx,
19 PrivateKeyInfo,
20 PublicKeyInfo,
21 RSAPrivateKey,
22 RSAPublicKey,
23 SafeContents,
24 unarmor,
25)
27from .kdf import pbkdf1, pbkdf2, pkcs12_kdf
28from .symmetric import (
29 aes_cbc_pkcs7_decrypt,
30 des_cbc_pkcs5_decrypt,
31 rc2_cbc_pkcs5_decrypt,
32 rc4_decrypt,
33 tripledes_cbc_pkcs5_decrypt,
34)
35from .util import constant_compare
36from ._errors import pretty_message
37from ._types import byte_cls, str_cls, type_name
40class _PrivateKeyBase():
42 asn1 = None
43 _fingerprint = None
45 def unwrap(self):
46 """
47 Unwraps the private key into an asn1crypto.keys.RSAPrivateKey,
48 asn1crypto.keys.DSAPrivateKey or asn1crypto.keys.ECPrivateKey object
50 :return:
51 An asn1crypto.keys.RSAPrivateKey, asn1crypto.keys.DSAPrivateKey or
52 asn1crypto.keys.ECPrivateKey object
53 """
55 if self.algorithm == 'rsa':
56 return self.asn1['private_key'].parsed
58 if self.algorithm == 'dsa':
59 params = self.asn1['private_key_algorithm']['parameters']
60 return DSAPrivateKey({
61 'version': 0,
62 'p': params['p'],
63 'q': params['q'],
64 'g': params['g'],
65 'public_key': self.public_key.unwrap(),
66 'private_key': self.asn1['private_key'].parsed,
67 })
69 if self.algorithm == 'ec':
70 output = self.asn1['private_key'].parsed
71 output['parameters'] = self.asn1['private_key_algorithm']['parameters']
72 output['public_key'] = self.public_key.unwrap()
73 return output
75 @property
76 def algorithm(self):
77 """
78 :return:
79 A unicode string of "rsa", "dsa" or "ec"
80 """
82 return self.asn1.algorithm
84 @property
85 def curve(self):
86 """
87 :return:
88 A unicode string of EC curve name
89 """
91 return self.asn1.curve[1]
93 @property
94 def bit_size(self):
95 """
96 :return:
97 The number of bits in the key, as an integer
98 """
100 return self.asn1.bit_size
102 @property
103 def byte_size(self):
104 """
105 :return:
106 The number of bytes in the key, as an integer
107 """
109 return self.asn1.byte_size
112class _PublicKeyBase():
114 asn1 = None
115 _fingerprint = None
117 def unwrap(self):
118 """
119 Unwraps a public key into an asn1crypto.keys.RSAPublicKey,
120 asn1crypto.core.Integer (for DSA) or asn1crypto.keys.ECPointBitString
121 object
123 :return:
124 An asn1crypto.keys.RSAPublicKey, asn1crypto.core.Integer or
125 asn1crypto.keys.ECPointBitString object
126 """
128 if self.algorithm == 'ec':
129 return self.asn1['public_key']
130 return self.asn1['public_key'].parsed
132 @property
133 def fingerprint(self):
134 """
135 Creates a fingerprint that can be compared with a private key to see if
136 the two form a pair.
138 This fingerprint is not compatible with fingerprints generated by any
139 other software.
141 :return:
142 A byte string that is a sha256 hash of selected components (based
143 on the key type)
144 """
146 if self._fingerprint is None:
147 self._fingerprint = _fingerprint(self.asn1, None)
148 return self._fingerprint
150 @property
151 def algorithm(self):
152 """
153 :return:
154 A unicode string of "rsa", "dsa" or "ec"
155 """
157 return self.asn1.algorithm
159 @property
160 def curve(self):
161 """
162 :return:
163 A unicode string of EC curve name
164 """
166 return self.asn1.curve[1]
168 @property
169 def bit_size(self):
170 """
171 :return:
172 The number of bits in the key, as an integer
173 """
175 return self.asn1.bit_size
177 @property
178 def byte_size(self):
179 """
180 :return:
181 The number of bytes in the key, as an integer
182 """
184 return self.asn1.byte_size
187class _CertificateBase():
189 asn1 = None
191 @property
192 def algorithm(self):
193 """
194 :return:
195 A unicode string of "rsa", "dsa" or "ec"
196 """
198 return self.public_key.algorithm
200 @property
201 def curve(self):
202 """
203 :return:
204 A unicode string of EC curve name
205 """
207 return self.public_key.curve
209 @property
210 def bit_size(self):
211 """
212 :return:
213 The number of bits in the public key, as an integer
214 """
216 return self.public_key.bit_size
218 @property
219 def byte_size(self):
220 """
221 :return:
222 The number of bytes in the public key, as an integer
223 """
225 return self.public_key.byte_size
228def _unwrap_private_key_info(key_info):
229 """
230 Unwraps an asn1crypto.keys.PrivateKeyInfo object into an
231 asn1crypto.keys.RSAPrivateKey, asn1crypto.keys.DSAPrivateKey
232 or asn1crypto.keys.ECPrivateKey.
234 :param key_info:
235 An asn1crypto.keys.PrivateKeyInfo object
237 :return:
238 One of:
239 - asn1crypto.keys.RSAPrivateKey
240 - asn1crypto.keys.DSAPrivateKey
241 - asn1crypto.keys.ECPrivateKey
242 """
244 key_alg = key_info.algorithm
246 if key_alg == 'rsa' or key_alg == 'rsassa_pss':
247 return key_info['private_key'].parsed
249 if key_alg == 'dsa':
250 params = key_info['private_key_algorithm']['parameters']
251 parsed = key_info['private_key'].parsed
252 return DSAPrivateKey({
253 'version': 0,
254 'p': params['p'],
255 'q': params['q'],
256 'g': params['g'],
257 'public_key': Integer(pow(
258 params['g'].native,
259 parsed.native,
260 params['p'].native
261 )),
262 'private_key': parsed,
263 })
265 if key_alg == 'ec':
266 parsed = key_info['private_key'].parsed
267 parsed['parameters'] = key_info['private_key_algorithm']['parameters']
268 return parsed
270 raise ValueError('Unsupported key_info.algorithm "%s"' % key_info.algorithm)
273def _fingerprint(key_object, load_private_key):
274 """
275 Returns a fingerprint used for correlating public keys and private keys
277 :param key_object:
278 An asn1crypto.keys.PrivateKeyInfo or asn1crypto.keys.PublicKeyInfo
280 :raises:
281 ValueError - when the key_object is not of the proper type
283 ;return:
284 A byte string fingerprint
285 """
287 if isinstance(key_object, PrivateKeyInfo):
288 key = key_object['private_key'].parsed
290 if key_object.algorithm == 'rsa':
291 to_hash = '%d:%d' % (
292 key['modulus'].native,
293 key['public_exponent'].native,
294 )
296 elif key_object.algorithm == 'dsa':
297 params = key_object['private_key_algorithm']['parameters']
298 public_key = Integer(pow(
299 params['g'].native,
300 key_object['private_key'].parsed.native,
301 params['p'].native
302 ))
304 to_hash = '%d:%d:%d:%d' % (
305 params['p'].native,
306 params['q'].native,
307 params['g'].native,
308 public_key.native,
309 )
311 elif key_object.algorithm == 'ec':
312 public_key = key['public_key'].native
313 if public_key is None:
314 # This is gross, but since the EC public key is optional,
315 # and we need to load the private key and use the crypto lib
316 # to get the public key, we have to import the platform-specific
317 # asymmetric implementation. This is the reason a bunch of the
318 # imports are module imports, so we don't get an import cycle.
319 public_key_object = load_private_key(key_object).public_key
320 public_key = public_key_object.asn1['public_key'].parsed.native
322 to_hash = '%s:' % key_object.curve[1]
323 to_hash = to_hash.encode('utf-8')
324 to_hash += public_key
326 if isinstance(to_hash, str_cls):
327 to_hash = to_hash.encode('utf-8')
329 return hashlib.sha256(to_hash).digest()
331 if isinstance(key_object, PublicKeyInfo):
332 if key_object.algorithm == 'rsa':
333 key = key_object['public_key'].parsed
335 to_hash = '%d:%d' % (
336 key['modulus'].native,
337 key['public_exponent'].native,
338 )
340 elif key_object.algorithm == 'dsa':
341 key = key_object['public_key'].parsed
342 params = key_object['algorithm']['parameters']
344 to_hash = '%d:%d:%d:%d' % (
345 params['p'].native,
346 params['q'].native,
347 params['g'].native,
348 key.native,
349 )
351 elif key_object.algorithm == 'ec':
352 public_key = key_object['public_key'].native
354 to_hash = '%s:' % key_object.curve[1]
355 to_hash = to_hash.encode('utf-8')
356 to_hash += public_key
358 if isinstance(to_hash, str_cls):
359 to_hash = to_hash.encode('utf-8')
361 return hashlib.sha256(to_hash).digest()
363 raise ValueError(pretty_message(
364 '''
365 key_object must be an instance of the
366 asn1crypto.keys.PrivateKeyInfo or asn1crypto.keys.PublicKeyInfo
367 classes, not %s
368 ''',
369 type_name(key_object)
370 ))
373crypto_funcs = {
374 'rc2': rc2_cbc_pkcs5_decrypt,
375 'rc4': rc4_decrypt,
376 'des': des_cbc_pkcs5_decrypt,
377 'tripledes': tripledes_cbc_pkcs5_decrypt,
378 'aes': aes_cbc_pkcs7_decrypt,
379}
382def parse_public(data):
383 """
384 Loads a public key from a DER or PEM-formatted file. Supports RSA, DSA and
385 EC public keys. For RSA keys, both the old RSAPublicKey and
386 SubjectPublicKeyInfo structures are supported. Also allows extracting a
387 public key from an X.509 certificate.
389 :param data:
390 A byte string to load the public key from
392 :raises:
393 ValueError - when the data does not appear to contain a public key
395 :return:
396 An asn1crypto.keys.PublicKeyInfo object
397 """
399 if not isinstance(data, byte_cls):
400 raise TypeError(pretty_message(
401 '''
402 data must be a byte string, not %s
403 ''',
404 type_name(data)
405 ))
407 key_type = None
409 # Appears to be PEM formatted
410 if re.match(b'\\s*-----', data) is not None:
411 key_type, algo, data = _unarmor_pem(data)
413 if key_type == 'private key':
414 raise ValueError(pretty_message(
415 '''
416 The data specified does not appear to be a public key or
417 certificate, but rather a private key
418 '''
419 ))
421 # When a public key returning from _unarmor_pem has a known algorithm
422 # of RSA, that means the DER structure is of the type RSAPublicKey, so
423 # we need to wrap it in the PublicKeyInfo structure.
424 if algo == 'rsa':
425 return PublicKeyInfo.wrap(data, 'rsa')
427 if key_type is None or key_type == 'public key':
428 try:
429 pki = PublicKeyInfo.load(data)
430 # Call .native to fully parse since asn1crypto is lazy
431 pki.native
432 return pki
433 except (ValueError):
434 pass # Data was not PublicKeyInfo
436 try:
437 rpk = RSAPublicKey.load(data)
438 # Call .native to fully parse since asn1crypto is lazy
439 rpk.native
440 return PublicKeyInfo.wrap(rpk, 'rsa')
441 except (ValueError):
442 pass # Data was not an RSAPublicKey
444 if key_type is None or key_type == 'certificate':
445 try:
446 parsed_cert = Certificate.load(data)
447 key_info = parsed_cert['tbs_certificate']['subject_public_key_info']
448 return key_info
449 except (ValueError):
450 pass # Data was not a cert
452 raise ValueError('The data specified does not appear to be a known public key or certificate format')
455def parse_certificate(data):
456 """
457 Loads a certificate from a DER or PEM-formatted file. Supports X.509
458 certificates only.
460 :param data:
461 A byte string to load the certificate from
463 :raises:
464 ValueError - when the data does not appear to contain a certificate
466 :return:
467 An asn1crypto.x509.Certificate object
468 """
470 if not isinstance(data, byte_cls):
471 raise TypeError(pretty_message(
472 '''
473 data must be a byte string, not %s
474 ''',
475 type_name(data)
476 ))
478 key_type = None
480 # Appears to be PEM formatted
481 if re.match(b'\\s*-----', data) is not None:
482 key_type, _, data = _unarmor_pem(data)
484 if key_type == 'private key':
485 raise ValueError(pretty_message(
486 '''
487 The data specified does not appear to be a certificate, but
488 rather a private key
489 '''
490 ))
492 if key_type == 'public key':
493 raise ValueError(pretty_message(
494 '''
495 The data specified does not appear to be a certificate, but
496 rather a public key
497 '''
498 ))
500 if key_type is None or key_type == 'certificate':
501 try:
502 return Certificate.load(data)
503 except (ValueError):
504 pass # Data was not a Certificate
506 raise ValueError(pretty_message(
507 '''
508 The data specified does not appear to be a known certificate format
509 '''
510 ))
513def parse_private(data, password=None):
514 """
515 Loads a private key from a DER or PEM-formatted file. Supports RSA, DSA and
516 EC private keys. Works with the follow formats:
518 - RSAPrivateKey (PKCS#1)
519 - ECPrivateKey (SECG SEC1 V2)
520 - DSAPrivateKey (OpenSSL)
521 - PrivateKeyInfo (RSA/DSA/EC - PKCS#8)
522 - EncryptedPrivateKeyInfo (RSA/DSA/EC - PKCS#8)
523 - Encrypted RSAPrivateKey (PEM only, OpenSSL)
524 - Encrypted DSAPrivateKey (PEM only, OpenSSL)
525 - Encrypted ECPrivateKey (PEM only, OpenSSL)
527 :param data:
528 A byte string to load the private key from
530 :param password:
531 The password to unencrypt the private key
533 :raises:
534 ValueError - when the data does not appear to contain a private key, or the password is invalid
536 :return:
537 An asn1crypto.keys.PrivateKeyInfo object
538 """
540 if not isinstance(data, byte_cls):
541 raise TypeError(pretty_message(
542 '''
543 data must be a byte string, not %s
544 ''',
545 type_name(data)
546 ))
548 if password is not None:
549 if not isinstance(password, byte_cls):
550 raise TypeError(pretty_message(
551 '''
552 password must be a byte string, not %s
553 ''',
554 type_name(password)
555 ))
556 else:
557 password = b''
559 # Appears to be PEM formatted
560 if re.match(b'\\s*-----', data) is not None:
561 key_type, _, data = _unarmor_pem(data, password)
563 if key_type == 'public key':
564 raise ValueError(pretty_message(
565 '''
566 The data specified does not appear to be a private key, but
567 rather a public key
568 '''
569 ))
571 if key_type == 'certificate':
572 raise ValueError(pretty_message(
573 '''
574 The data specified does not appear to be a private key, but
575 rather a certificate
576 '''
577 ))
579 try:
580 pki = PrivateKeyInfo.load(data)
581 # Call .native to fully parse since asn1crypto is lazy
582 pki.native
583 return pki
584 except (ValueError):
585 pass # Data was not PrivateKeyInfo
587 try:
588 parsed_wrapper = EncryptedPrivateKeyInfo.load(data)
589 encryption_algorithm_info = parsed_wrapper['encryption_algorithm']
590 encrypted_data = parsed_wrapper['encrypted_data'].native
591 decrypted_data = _decrypt_encrypted_data(encryption_algorithm_info, encrypted_data, password)
592 pki = PrivateKeyInfo.load(decrypted_data)
593 # Call .native to fully parse since asn1crypto is lazy
594 pki.native
595 return pki
596 except (ValueError):
597 pass # Data was not EncryptedPrivateKeyInfo
599 try:
600 parsed = RSAPrivateKey.load(data)
601 # Call .native to fully parse since asn1crypto is lazy
602 parsed.native
603 return PrivateKeyInfo.wrap(parsed, 'rsa')
604 except (ValueError):
605 pass # Data was not an RSAPrivateKey
607 try:
608 parsed = DSAPrivateKey.load(data)
609 # Call .native to fully parse since asn1crypto is lazy
610 parsed.native
611 return PrivateKeyInfo.wrap(parsed, 'dsa')
612 except (ValueError):
613 pass # Data was not a DSAPrivateKey
615 try:
616 parsed = ECPrivateKey.load(data)
617 # Call .native to fully parse since asn1crypto is lazy
618 parsed.native
619 return PrivateKeyInfo.wrap(parsed, 'ec')
620 except (ValueError):
621 pass # Data was not an ECPrivateKey
623 raise ValueError(pretty_message(
624 '''
625 The data specified does not appear to be a known private key format
626 '''
627 ))
630def _unarmor_pem(data, password=None):
631 """
632 Removes PEM-encoding from a public key, private key or certificate. If the
633 private key is encrypted, the password will be used to decrypt it.
635 :param data:
636 A byte string of the PEM-encoded data
638 :param password:
639 A byte string of the encryption password, or None
641 :return:
642 A 3-element tuple in the format: (key_type, algorithm, der_bytes). The
643 key_type will be a unicode string of "public key", "private key" or
644 "certificate". The algorithm will be a unicode string of "rsa", "dsa"
645 or "ec".
646 """
648 object_type, headers, der_bytes = unarmor(data)
650 type_regex = '^((DSA|EC|RSA) PRIVATE KEY|ENCRYPTED PRIVATE KEY|PRIVATE KEY|PUBLIC KEY|RSA PUBLIC KEY|CERTIFICATE)'
651 armor_type = re.match(type_regex, object_type)
652 if not armor_type:
653 raise ValueError(pretty_message(
654 '''
655 data does not seem to contain a PEM-encoded certificate, private
656 key or public key
657 '''
658 ))
660 pem_header = armor_type.group(1)
662 data = data.strip()
664 # RSA private keys are encrypted after being DER-encoded, but before base64
665 # encoding, so they need to be handled specially
666 if pem_header in set(['RSA PRIVATE KEY', 'DSA PRIVATE KEY', 'EC PRIVATE KEY']):
667 algo = armor_type.group(2).lower()
668 return ('private key', algo, _unarmor_pem_openssl_private(headers, der_bytes, password))
670 key_type = pem_header.lower()
671 algo = None
672 if key_type == 'encrypted private key':
673 key_type = 'private key'
674 elif key_type == 'rsa public key':
675 key_type = 'public key'
676 algo = 'rsa'
678 return (key_type, algo, der_bytes)
681def _unarmor_pem_openssl_private(headers, data, password):
682 """
683 Parses a PKCS#1 private key, or encrypted private key
685 :param headers:
686 A dict of "Name: Value" lines from right after the PEM header
688 :param data:
689 A byte string of the DER-encoded PKCS#1 private key
691 :param password:
692 A byte string of the password to use if the private key is encrypted
694 :return:
695 A byte string of the DER-encoded private key
696 """
698 enc_algo = None
699 enc_iv_hex = None
700 enc_iv = None
702 if 'DEK-Info' in headers:
703 params = headers['DEK-Info']
704 if params.find(',') != -1:
705 enc_algo, enc_iv_hex = params.strip().split(',')
706 else:
707 enc_algo = 'RC4'
709 if not enc_algo:
710 return data
712 if enc_iv_hex:
713 enc_iv = binascii.unhexlify(enc_iv_hex.encode('ascii'))
714 enc_algo = enc_algo.lower()
716 enc_key_length = {
717 'aes-128-cbc': 16,
718 'aes-128': 16,
719 'aes-192-cbc': 24,
720 'aes-192': 24,
721 'aes-256-cbc': 32,
722 'aes-256': 32,
723 'rc4': 16,
724 'rc4-64': 8,
725 'rc4-40': 5,
726 'rc2-64-cbc': 8,
727 'rc2-40-cbc': 5,
728 'rc2-cbc': 16,
729 'rc2': 16,
730 'des-ede3-cbc': 24,
731 'des-ede3': 24,
732 'des3': 24,
733 'des-ede-cbc': 16,
734 'des-cbc': 8,
735 'des': 8,
736 }[enc_algo]
738 enc_key = hashlib.md5(password + enc_iv[0:8]).digest()
739 while enc_key_length > len(enc_key):
740 enc_key += hashlib.md5(enc_key + password + enc_iv[0:8]).digest()
741 enc_key = enc_key[0:enc_key_length]
743 enc_algo_name = {
744 'aes-128-cbc': 'aes',
745 'aes-128': 'aes',
746 'aes-192-cbc': 'aes',
747 'aes-192': 'aes',
748 'aes-256-cbc': 'aes',
749 'aes-256': 'aes',
750 'rc4': 'rc4',
751 'rc4-64': 'rc4',
752 'rc4-40': 'rc4',
753 'rc2-64-cbc': 'rc2',
754 'rc2-40-cbc': 'rc2',
755 'rc2-cbc': 'rc2',
756 'rc2': 'rc2',
757 'des-ede3-cbc': 'tripledes',
758 'des-ede3': 'tripledes',
759 'des3': 'tripledes',
760 'des-ede-cbc': 'tripledes',
761 'des-cbc': 'des',
762 'des': 'des',
763 }[enc_algo]
764 decrypt_func = crypto_funcs[enc_algo_name]
766 if enc_algo_name == 'rc4':
767 return decrypt_func(enc_key, data)
769 return decrypt_func(enc_key, data, enc_iv)
772def _parse_pkcs12(data, password, load_private_key):
773 """
774 Parses a PKCS#12 ANS.1 DER-encoded structure and extracts certs and keys
776 :param data:
777 A byte string of a DER-encoded PKCS#12 file
779 :param password:
780 A byte string of the password to any encrypted data
782 :param load_private_key:
783 A callable that will accept a byte string and return an
784 oscrypto.asymmetric.PrivateKey object
786 :raises:
787 ValueError - when any of the parameters are of the wrong type or value
788 OSError - when an error is returned by one of the OS decryption functions
790 :return:
791 A three-element tuple of:
792 1. An asn1crypto.keys.PrivateKeyInfo object
793 2. An asn1crypto.x509.Certificate object
794 3. A list of zero or more asn1crypto.x509.Certificate objects that are
795 "extra" certificates, possibly intermediates from the cert chain
796 """
798 if not isinstance(data, byte_cls):
799 raise TypeError(pretty_message(
800 '''
801 data must be a byte string, not %s
802 ''',
803 type_name(data)
804 ))
806 if password is not None:
807 if not isinstance(password, byte_cls):
808 raise TypeError(pretty_message(
809 '''
810 password must be a byte string, not %s
811 ''',
812 type_name(password)
813 ))
814 else:
815 password = b''
817 certs = {}
818 private_keys = {}
820 pfx = Pfx.load(data)
822 auth_safe = pfx['auth_safe']
823 if auth_safe['content_type'].native != 'data':
824 raise ValueError(pretty_message(
825 '''
826 Only password-protected PKCS12 files are currently supported
827 '''
828 ))
829 authenticated_safe = pfx.authenticated_safe
831 mac_data = pfx['mac_data']
832 if mac_data:
833 mac_algo = mac_data['mac']['digest_algorithm']['algorithm'].native
834 key_length = {
835 'sha1': 20,
836 'sha224': 28,
837 'sha256': 32,
838 'sha384': 48,
839 'sha512': 64,
840 'sha512_224': 28,
841 'sha512_256': 32,
842 }[mac_algo]
843 mac_key = pkcs12_kdf(
844 mac_algo,
845 password,
846 mac_data['mac_salt'].native,
847 mac_data['iterations'].native,
848 key_length,
849 3 # ID 3 is for generating an HMAC key
850 )
851 hash_mod = getattr(hashlib, mac_algo)
852 computed_hmac = hmac.new(mac_key, auth_safe['content'].contents, hash_mod).digest()
853 stored_hmac = mac_data['mac']['digest'].native
854 if not constant_compare(computed_hmac, stored_hmac):
855 raise ValueError('Password provided is invalid')
857 for content_info in authenticated_safe:
858 content = content_info['content']
860 if isinstance(content, OctetString):
861 _parse_safe_contents(content.native, certs, private_keys, password, load_private_key)
863 elif isinstance(content, EncryptedData):
864 encrypted_content_info = content['encrypted_content_info']
866 encryption_algorithm_info = encrypted_content_info['content_encryption_algorithm']
867 encrypted_content = encrypted_content_info['encrypted_content'].native
868 decrypted_content = _decrypt_encrypted_data(encryption_algorithm_info, encrypted_content, password)
870 _parse_safe_contents(decrypted_content, certs, private_keys, password, load_private_key)
872 else:
873 raise ValueError(pretty_message(
874 '''
875 Public-key-based PKCS12 files are not currently supported
876 '''
877 ))
879 key_fingerprints = set(private_keys.keys())
880 cert_fingerprints = set(certs.keys())
882 common_fingerprints = sorted(list(key_fingerprints & cert_fingerprints))
884 key = None
885 cert = None
886 other_certs = []
888 if len(common_fingerprints) >= 1:
889 fingerprint = common_fingerprints[0]
890 key = private_keys[fingerprint]
891 cert = certs[fingerprint]
892 other_certs = [certs[f] for f in certs if f != fingerprint]
893 return (key, cert, other_certs)
895 if len(private_keys) > 0:
896 first_key = sorted(list(private_keys.keys()))[0]
897 key = private_keys[first_key]
899 if len(certs) > 0:
900 first_key = sorted(list(certs.keys()))[0]
901 cert = certs[first_key]
902 del certs[first_key]
904 if len(certs) > 0:
905 other_certs = sorted(list(certs.values()), key=lambda c: c.subject.human_friendly)
907 return (key, cert, other_certs)
910def _parse_safe_contents(safe_contents, certs, private_keys, password, load_private_key):
911 """
912 Parses a SafeContents PKCS#12 ANS.1 structure and extracts certs and keys
914 :param safe_contents:
915 A byte string of ber-encoded SafeContents, or a asn1crypto.pkcs12.SafeContents
916 parsed object
918 :param certs:
919 A dict to store certificates in
921 :param keys:
922 A dict to store keys in
924 :param password:
925 A byte string of the password to any encrypted data
927 :param load_private_key:
928 A callable that will accept a byte string and return an
929 oscrypto.asymmetric.PrivateKey object
930 """
932 if isinstance(safe_contents, byte_cls):
933 safe_contents = SafeContents.load(safe_contents)
935 for safe_bag in safe_contents:
936 bag_value = safe_bag['bag_value']
938 if isinstance(bag_value, CertBag):
939 if bag_value['cert_id'].native == 'x509':
940 cert = bag_value['cert_value'].parsed
941 public_key_info = cert['tbs_certificate']['subject_public_key_info']
942 certs[_fingerprint(public_key_info, None)] = bag_value['cert_value'].parsed
944 elif isinstance(bag_value, PrivateKeyInfo):
945 private_keys[_fingerprint(bag_value, load_private_key)] = bag_value
947 elif isinstance(bag_value, EncryptedPrivateKeyInfo):
948 encryption_algorithm_info = bag_value['encryption_algorithm']
949 encrypted_key_bytes = bag_value['encrypted_data'].native
950 decrypted_key_bytes = _decrypt_encrypted_data(encryption_algorithm_info, encrypted_key_bytes, password)
951 private_key = PrivateKeyInfo.load(decrypted_key_bytes)
952 private_keys[_fingerprint(private_key, load_private_key)] = private_key
954 elif isinstance(bag_value, SafeContents):
955 _parse_safe_contents(bag_value, certs, private_keys, password, load_private_key)
957 else:
958 # We don't care about CRL bags or secret bags
959 pass
962def _decrypt_encrypted_data(encryption_algorithm_info, encrypted_content, password):
963 """
964 Decrypts encrypted ASN.1 data
966 :param encryption_algorithm_info:
967 An instance of asn1crypto.pkcs5.Pkcs5EncryptionAlgorithm
969 :param encrypted_content:
970 A byte string of the encrypted content
972 :param password:
973 A byte string of the encrypted content's password
975 :return:
976 A byte string of the decrypted plaintext
977 """
979 decrypt_func = crypto_funcs[encryption_algorithm_info.encryption_cipher]
981 # Modern, PKCS#5 PBES2-based encryption
982 if encryption_algorithm_info.kdf == 'pbkdf2':
984 if encryption_algorithm_info.encryption_cipher == 'rc5':
985 raise ValueError(pretty_message(
986 '''
987 PBES2 encryption scheme utilizing RC5 encryption is not supported
988 '''
989 ))
991 enc_key = pbkdf2(
992 encryption_algorithm_info.kdf_hmac,
993 password,
994 encryption_algorithm_info.kdf_salt,
995 encryption_algorithm_info.kdf_iterations,
996 encryption_algorithm_info.key_length
997 )
998 enc_iv = encryption_algorithm_info.encryption_iv
1000 plaintext = decrypt_func(enc_key, encrypted_content, enc_iv)
1002 elif encryption_algorithm_info.kdf == 'pbkdf1':
1003 derived_output = pbkdf1(
1004 encryption_algorithm_info.kdf_hmac,
1005 password,
1006 encryption_algorithm_info.kdf_salt,
1007 encryption_algorithm_info.kdf_iterations,
1008 encryption_algorithm_info.key_length + 8
1009 )
1010 enc_key = derived_output[0:8]
1011 enc_iv = derived_output[8:16]
1013 plaintext = decrypt_func(enc_key, encrypted_content, enc_iv)
1015 elif encryption_algorithm_info.kdf == 'pkcs12_kdf':
1016 enc_key = pkcs12_kdf(
1017 encryption_algorithm_info.kdf_hmac,
1018 password,
1019 encryption_algorithm_info.kdf_salt,
1020 encryption_algorithm_info.kdf_iterations,
1021 encryption_algorithm_info.key_length,
1022 1 # ID 1 is for generating a key
1023 )
1025 # Since RC4 is a stream cipher, we don't use an IV
1026 if encryption_algorithm_info.encryption_cipher == 'rc4':
1027 plaintext = decrypt_func(enc_key, encrypted_content)
1029 else:
1030 enc_iv = pkcs12_kdf(
1031 encryption_algorithm_info.kdf_hmac,
1032 password,
1033 encryption_algorithm_info.kdf_salt,
1034 encryption_algorithm_info.kdf_iterations,
1035 encryption_algorithm_info.encryption_block_size,
1036 2 # ID 2 is for generating an IV
1037 )
1038 plaintext = decrypt_func(enc_key, encrypted_content, enc_iv)
1040 return plaintext