Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/rsa.py: 39%
262 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:50 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:50 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import threading
8import typing
10from cryptography.exceptions import (
11 InvalidSignature,
12 UnsupportedAlgorithm,
13 _Reasons,
14)
15from cryptography.hazmat.backends.openssl.utils import (
16 _calculate_digest_and_algorithm,
17)
18from cryptography.hazmat.primitives import hashes, serialization
19from cryptography.hazmat.primitives.asymmetric import utils as asym_utils
20from cryptography.hazmat.primitives.asymmetric.padding import (
21 MGF1,
22 OAEP,
23 PSS,
24 AsymmetricPadding,
25 PKCS1v15,
26 _Auto,
27 _DigestLength,
28 _MaxLength,
29 calculate_max_pss_salt_length,
30)
31from cryptography.hazmat.primitives.asymmetric.rsa import (
32 RSAPrivateKey,
33 RSAPrivateNumbers,
34 RSAPublicKey,
35 RSAPublicNumbers,
36)
38if typing.TYPE_CHECKING:
39 from cryptography.hazmat.backends.openssl.backend import Backend
42def _get_rsa_pss_salt_length(
43 backend: Backend,
44 pss: PSS,
45 key: typing.Union[RSAPrivateKey, RSAPublicKey],
46 hash_algorithm: hashes.HashAlgorithm,
47) -> int:
48 salt = pss._salt_length
50 if isinstance(salt, _MaxLength):
51 return calculate_max_pss_salt_length(key, hash_algorithm)
52 elif isinstance(salt, _DigestLength):
53 return hash_algorithm.digest_size
54 elif isinstance(salt, _Auto):
55 if isinstance(key, RSAPrivateKey):
56 raise ValueError(
57 "PSS salt length can only be set to AUTO when verifying"
58 )
59 return backend._lib.RSA_PSS_SALTLEN_AUTO
60 else:
61 return salt
64def _enc_dec_rsa(
65 backend: Backend,
66 key: typing.Union[_RSAPrivateKey, _RSAPublicKey],
67 data: bytes,
68 padding: AsymmetricPadding,
69) -> bytes:
70 if not isinstance(padding, AsymmetricPadding):
71 raise TypeError("Padding must be an instance of AsymmetricPadding.")
73 if isinstance(padding, PKCS1v15):
74 padding_enum = backend._lib.RSA_PKCS1_PADDING
75 elif isinstance(padding, OAEP):
76 padding_enum = backend._lib.RSA_PKCS1_OAEP_PADDING
78 if not isinstance(padding._mgf, MGF1):
79 raise UnsupportedAlgorithm(
80 "Only MGF1 is supported by this backend.",
81 _Reasons.UNSUPPORTED_MGF,
82 )
84 if not backend.rsa_padding_supported(padding):
85 raise UnsupportedAlgorithm(
86 "This combination of padding and hash algorithm is not "
87 "supported by this backend.",
88 _Reasons.UNSUPPORTED_PADDING,
89 )
91 else:
92 raise UnsupportedAlgorithm(
93 f"{padding.name} is not supported by this backend.",
94 _Reasons.UNSUPPORTED_PADDING,
95 )
97 return _enc_dec_rsa_pkey_ctx(backend, key, data, padding_enum, padding)
100def _enc_dec_rsa_pkey_ctx(
101 backend: Backend,
102 key: typing.Union[_RSAPrivateKey, _RSAPublicKey],
103 data: bytes,
104 padding_enum: int,
105 padding: AsymmetricPadding,
106) -> bytes:
107 init: typing.Callable[[typing.Any], int]
108 crypt: typing.Callable[[typing.Any, typing.Any, int, bytes, int], int]
109 if isinstance(key, _RSAPublicKey):
110 init = backend._lib.EVP_PKEY_encrypt_init
111 crypt = backend._lib.EVP_PKEY_encrypt
112 else:
113 init = backend._lib.EVP_PKEY_decrypt_init
114 crypt = backend._lib.EVP_PKEY_decrypt
116 pkey_ctx = backend._lib.EVP_PKEY_CTX_new(key._evp_pkey, backend._ffi.NULL)
117 backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
118 pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
119 res = init(pkey_ctx)
120 backend.openssl_assert(res == 1)
121 res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(pkey_ctx, padding_enum)
122 backend.openssl_assert(res > 0)
123 buf_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
124 backend.openssl_assert(buf_size > 0)
125 if isinstance(padding, OAEP):
126 mgf1_md = backend._evp_md_non_null_from_algorithm(
127 padding._mgf._algorithm
128 )
129 res = backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(pkey_ctx, mgf1_md)
130 backend.openssl_assert(res > 0)
131 oaep_md = backend._evp_md_non_null_from_algorithm(padding._algorithm)
132 res = backend._lib.EVP_PKEY_CTX_set_rsa_oaep_md(pkey_ctx, oaep_md)
133 backend.openssl_assert(res > 0)
135 if (
136 isinstance(padding, OAEP)
137 and padding._label is not None
138 and len(padding._label) > 0
139 ):
140 # set0_rsa_oaep_label takes ownership of the char * so we need to
141 # copy it into some new memory
142 labelptr = backend._lib.OPENSSL_malloc(len(padding._label))
143 backend.openssl_assert(labelptr != backend._ffi.NULL)
144 backend._ffi.memmove(labelptr, padding._label, len(padding._label))
145 res = backend._lib.EVP_PKEY_CTX_set0_rsa_oaep_label(
146 pkey_ctx, labelptr, len(padding._label)
147 )
148 backend.openssl_assert(res == 1)
150 outlen = backend._ffi.new("size_t *", buf_size)
151 buf = backend._ffi.new("unsigned char[]", buf_size)
152 # Everything from this line onwards is written with the goal of being as
153 # constant-time as is practical given the constraints of Python and our
154 # API. See Bleichenbacher's '98 attack on RSA, and its many many variants.
155 # As such, you should not attempt to change this (particularly to "clean it
156 # up") without understanding why it was written this way (see
157 # Chesterton's Fence), and without measuring to verify you have not
158 # introduced observable time differences.
159 res = crypt(pkey_ctx, buf, outlen, data, len(data))
160 resbuf = backend._ffi.buffer(buf)[: outlen[0]]
161 backend._lib.ERR_clear_error()
162 if res <= 0:
163 raise ValueError("Encryption/decryption failed.")
164 return resbuf
167def _rsa_sig_determine_padding(
168 backend: Backend,
169 key: typing.Union[_RSAPrivateKey, _RSAPublicKey],
170 padding: AsymmetricPadding,
171 algorithm: typing.Optional[hashes.HashAlgorithm],
172) -> int:
173 if not isinstance(padding, AsymmetricPadding):
174 raise TypeError("Expected provider of AsymmetricPadding.")
176 pkey_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
177 backend.openssl_assert(pkey_size > 0)
179 if isinstance(padding, PKCS1v15):
180 # Hash algorithm is ignored for PKCS1v15-padding, may be None.
181 padding_enum = backend._lib.RSA_PKCS1_PADDING
182 elif isinstance(padding, PSS):
183 if not isinstance(padding._mgf, MGF1):
184 raise UnsupportedAlgorithm(
185 "Only MGF1 is supported by this backend.",
186 _Reasons.UNSUPPORTED_MGF,
187 )
189 # PSS padding requires a hash algorithm
190 if not isinstance(algorithm, hashes.HashAlgorithm):
191 raise TypeError("Expected instance of hashes.HashAlgorithm.")
193 # Size of key in bytes - 2 is the maximum
194 # PSS signature length (salt length is checked later)
195 if pkey_size - algorithm.digest_size - 2 < 0:
196 raise ValueError(
197 "Digest too large for key size. Use a larger "
198 "key or different digest."
199 )
201 padding_enum = backend._lib.RSA_PKCS1_PSS_PADDING
202 else:
203 raise UnsupportedAlgorithm(
204 f"{padding.name} is not supported by this backend.",
205 _Reasons.UNSUPPORTED_PADDING,
206 )
208 return padding_enum
211# Hash algorithm can be absent (None) to initialize the context without setting
212# any message digest algorithm. This is currently only valid for the PKCS1v15
213# padding type, where it means that the signature data is encoded/decoded
214# as provided, without being wrapped in a DigestInfo structure.
215def _rsa_sig_setup(
216 backend: Backend,
217 padding: AsymmetricPadding,
218 algorithm: typing.Optional[hashes.HashAlgorithm],
219 key: typing.Union[_RSAPublicKey, _RSAPrivateKey],
220 init_func: typing.Callable[[typing.Any], int],
221):
222 padding_enum = _rsa_sig_determine_padding(backend, key, padding, algorithm)
223 pkey_ctx = backend._lib.EVP_PKEY_CTX_new(key._evp_pkey, backend._ffi.NULL)
224 backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
225 pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
226 res = init_func(pkey_ctx)
227 if res != 1:
228 errors = backend._consume_errors()
229 raise ValueError("Unable to sign/verify with this key", errors)
231 if algorithm is not None:
232 evp_md = backend._evp_md_non_null_from_algorithm(algorithm)
233 res = backend._lib.EVP_PKEY_CTX_set_signature_md(pkey_ctx, evp_md)
234 if res <= 0:
235 backend._consume_errors()
236 raise UnsupportedAlgorithm(
237 "{} is not supported by this backend for RSA signing.".format(
238 algorithm.name
239 ),
240 _Reasons.UNSUPPORTED_HASH,
241 )
242 res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(pkey_ctx, padding_enum)
243 if res <= 0:
244 backend._consume_errors()
245 raise UnsupportedAlgorithm(
246 "{} is not supported for the RSA signature operation.".format(
247 padding.name
248 ),
249 _Reasons.UNSUPPORTED_PADDING,
250 )
251 if isinstance(padding, PSS):
252 assert isinstance(algorithm, hashes.HashAlgorithm)
253 res = backend._lib.EVP_PKEY_CTX_set_rsa_pss_saltlen(
254 pkey_ctx,
255 _get_rsa_pss_salt_length(backend, padding, key, algorithm),
256 )
257 backend.openssl_assert(res > 0)
259 mgf1_md = backend._evp_md_non_null_from_algorithm(
260 padding._mgf._algorithm
261 )
262 res = backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(pkey_ctx, mgf1_md)
263 backend.openssl_assert(res > 0)
265 return pkey_ctx
268def _rsa_sig_sign(
269 backend: Backend,
270 padding: AsymmetricPadding,
271 algorithm: hashes.HashAlgorithm,
272 private_key: _RSAPrivateKey,
273 data: bytes,
274) -> bytes:
275 pkey_ctx = _rsa_sig_setup(
276 backend,
277 padding,
278 algorithm,
279 private_key,
280 backend._lib.EVP_PKEY_sign_init,
281 )
282 buflen = backend._ffi.new("size_t *")
283 res = backend._lib.EVP_PKEY_sign(
284 pkey_ctx, backend._ffi.NULL, buflen, data, len(data)
285 )
286 backend.openssl_assert(res == 1)
287 buf = backend._ffi.new("unsigned char[]", buflen[0])
288 res = backend._lib.EVP_PKEY_sign(pkey_ctx, buf, buflen, data, len(data))
289 if res != 1:
290 errors = backend._consume_errors()
291 raise ValueError(
292 "Digest or salt length too long for key size. Use a larger key "
293 "or shorter salt length if you are specifying a PSS salt",
294 errors,
295 )
297 return backend._ffi.buffer(buf)[:]
300def _rsa_sig_verify(
301 backend: Backend,
302 padding: AsymmetricPadding,
303 algorithm: hashes.HashAlgorithm,
304 public_key: _RSAPublicKey,
305 signature: bytes,
306 data: bytes,
307) -> None:
308 pkey_ctx = _rsa_sig_setup(
309 backend,
310 padding,
311 algorithm,
312 public_key,
313 backend._lib.EVP_PKEY_verify_init,
314 )
315 res = backend._lib.EVP_PKEY_verify(
316 pkey_ctx, signature, len(signature), data, len(data)
317 )
318 # The previous call can return negative numbers in the event of an
319 # error. This is not a signature failure but we need to fail if it
320 # occurs.
321 backend.openssl_assert(res >= 0)
322 if res == 0:
323 backend._consume_errors()
324 raise InvalidSignature
327def _rsa_sig_recover(
328 backend: Backend,
329 padding: AsymmetricPadding,
330 algorithm: typing.Optional[hashes.HashAlgorithm],
331 public_key: _RSAPublicKey,
332 signature: bytes,
333) -> bytes:
334 pkey_ctx = _rsa_sig_setup(
335 backend,
336 padding,
337 algorithm,
338 public_key,
339 backend._lib.EVP_PKEY_verify_recover_init,
340 )
342 # Attempt to keep the rest of the code in this function as constant/time
343 # as possible. See the comment in _enc_dec_rsa_pkey_ctx. Note that the
344 # buflen parameter is used even though its value may be undefined in the
345 # error case. Due to the tolerant nature of Python slicing this does not
346 # trigger any exceptions.
347 maxlen = backend._lib.EVP_PKEY_size(public_key._evp_pkey)
348 backend.openssl_assert(maxlen > 0)
349 buf = backend._ffi.new("unsigned char[]", maxlen)
350 buflen = backend._ffi.new("size_t *", maxlen)
351 res = backend._lib.EVP_PKEY_verify_recover(
352 pkey_ctx, buf, buflen, signature, len(signature)
353 )
354 resbuf = backend._ffi.buffer(buf)[: buflen[0]]
355 backend._lib.ERR_clear_error()
356 # Assume that all parameter errors are handled during the setup phase and
357 # any error here is due to invalid signature.
358 if res != 1:
359 raise InvalidSignature
360 return resbuf
363class _RSAPrivateKey(RSAPrivateKey):
364 _evp_pkey: object
365 _rsa_cdata: object
366 _key_size: int
368 def __init__(
369 self,
370 backend: Backend,
371 rsa_cdata,
372 evp_pkey,
373 *,
374 unsafe_skip_rsa_key_validation: bool,
375 ):
376 res: int
377 # RSA_check_key is slower in OpenSSL 3.0.0 due to improved
378 # primality checking. In normal use this is unlikely to be a problem
379 # since users don't load new keys constantly, but for TESTING we've
380 # added an init arg that allows skipping the checks. You should not
381 # use this in production code unless you understand the consequences.
382 if not unsafe_skip_rsa_key_validation:
383 res = backend._lib.RSA_check_key(rsa_cdata)
384 if res != 1:
385 errors = backend._consume_errors()
386 raise ValueError("Invalid private key", errors)
387 # 2 is prime and passes an RSA key check, so we also check
388 # if p and q are odd just to be safe.
389 p = backend._ffi.new("BIGNUM **")
390 q = backend._ffi.new("BIGNUM **")
391 backend._lib.RSA_get0_factors(rsa_cdata, p, q)
392 backend.openssl_assert(p[0] != backend._ffi.NULL)
393 backend.openssl_assert(q[0] != backend._ffi.NULL)
394 p_odd = backend._lib.BN_is_odd(p[0])
395 q_odd = backend._lib.BN_is_odd(q[0])
396 if p_odd != 1 or q_odd != 1:
397 errors = backend._consume_errors()
398 raise ValueError("Invalid private key", errors)
400 self._backend = backend
401 self._rsa_cdata = rsa_cdata
402 self._evp_pkey = evp_pkey
403 # Used for lazy blinding
404 self._blinded = False
405 self._blinding_lock = threading.Lock()
407 n = self._backend._ffi.new("BIGNUM **")
408 self._backend._lib.RSA_get0_key(
409 self._rsa_cdata,
410 n,
411 self._backend._ffi.NULL,
412 self._backend._ffi.NULL,
413 )
414 self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
415 self._key_size = self._backend._lib.BN_num_bits(n[0])
417 def _enable_blinding(self) -> None:
418 # If you call blind on an already blinded RSA key OpenSSL will turn
419 # it off and back on, which is a performance hit we want to avoid.
420 if not self._blinded:
421 with self._blinding_lock:
422 self._non_threadsafe_enable_blinding()
424 def _non_threadsafe_enable_blinding(self) -> None:
425 # This is only a separate function to allow for testing to cover both
426 # branches. It should never be invoked except through _enable_blinding.
427 # Check if it's not True again in case another thread raced past the
428 # first non-locked check.
429 if not self._blinded:
430 res = self._backend._lib.RSA_blinding_on(
431 self._rsa_cdata, self._backend._ffi.NULL
432 )
433 self._backend.openssl_assert(res == 1)
434 self._blinded = True
436 @property
437 def key_size(self) -> int:
438 return self._key_size
440 def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes:
441 self._enable_blinding()
442 key_size_bytes = (self.key_size + 7) // 8
443 if key_size_bytes != len(ciphertext):
444 raise ValueError("Ciphertext length must be equal to key size.")
446 return _enc_dec_rsa(self._backend, self, ciphertext, padding)
448 def public_key(self) -> RSAPublicKey:
449 ctx = self._backend._lib.RSAPublicKey_dup(self._rsa_cdata)
450 self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
451 ctx = self._backend._ffi.gc(ctx, self._backend._lib.RSA_free)
452 evp_pkey = self._backend._rsa_cdata_to_evp_pkey(ctx)
453 return _RSAPublicKey(self._backend, ctx, evp_pkey)
455 def private_numbers(self) -> RSAPrivateNumbers:
456 n = self._backend._ffi.new("BIGNUM **")
457 e = self._backend._ffi.new("BIGNUM **")
458 d = self._backend._ffi.new("BIGNUM **")
459 p = self._backend._ffi.new("BIGNUM **")
460 q = self._backend._ffi.new("BIGNUM **")
461 dmp1 = self._backend._ffi.new("BIGNUM **")
462 dmq1 = self._backend._ffi.new("BIGNUM **")
463 iqmp = self._backend._ffi.new("BIGNUM **")
464 self._backend._lib.RSA_get0_key(self._rsa_cdata, n, e, d)
465 self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
466 self._backend.openssl_assert(e[0] != self._backend._ffi.NULL)
467 self._backend.openssl_assert(d[0] != self._backend._ffi.NULL)
468 self._backend._lib.RSA_get0_factors(self._rsa_cdata, p, q)
469 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
470 self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
471 self._backend._lib.RSA_get0_crt_params(
472 self._rsa_cdata, dmp1, dmq1, iqmp
473 )
474 self._backend.openssl_assert(dmp1[0] != self._backend._ffi.NULL)
475 self._backend.openssl_assert(dmq1[0] != self._backend._ffi.NULL)
476 self._backend.openssl_assert(iqmp[0] != self._backend._ffi.NULL)
477 return RSAPrivateNumbers(
478 p=self._backend._bn_to_int(p[0]),
479 q=self._backend._bn_to_int(q[0]),
480 d=self._backend._bn_to_int(d[0]),
481 dmp1=self._backend._bn_to_int(dmp1[0]),
482 dmq1=self._backend._bn_to_int(dmq1[0]),
483 iqmp=self._backend._bn_to_int(iqmp[0]),
484 public_numbers=RSAPublicNumbers(
485 e=self._backend._bn_to_int(e[0]),
486 n=self._backend._bn_to_int(n[0]),
487 ),
488 )
490 def private_bytes(
491 self,
492 encoding: serialization.Encoding,
493 format: serialization.PrivateFormat,
494 encryption_algorithm: serialization.KeySerializationEncryption,
495 ) -> bytes:
496 return self._backend._private_key_bytes(
497 encoding,
498 format,
499 encryption_algorithm,
500 self,
501 self._evp_pkey,
502 self._rsa_cdata,
503 )
505 def sign(
506 self,
507 data: bytes,
508 padding: AsymmetricPadding,
509 algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
510 ) -> bytes:
511 self._enable_blinding()
512 data, algorithm = _calculate_digest_and_algorithm(data, algorithm)
513 return _rsa_sig_sign(self._backend, padding, algorithm, self, data)
516class _RSAPublicKey(RSAPublicKey):
517 _evp_pkey: object
518 _rsa_cdata: object
519 _key_size: int
521 def __init__(self, backend: Backend, rsa_cdata, evp_pkey):
522 self._backend = backend
523 self._rsa_cdata = rsa_cdata
524 self._evp_pkey = evp_pkey
526 n = self._backend._ffi.new("BIGNUM **")
527 self._backend._lib.RSA_get0_key(
528 self._rsa_cdata,
529 n,
530 self._backend._ffi.NULL,
531 self._backend._ffi.NULL,
532 )
533 self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
534 self._key_size = self._backend._lib.BN_num_bits(n[0])
536 @property
537 def key_size(self) -> int:
538 return self._key_size
540 def __eq__(self, other: object) -> bool:
541 if not isinstance(other, _RSAPublicKey):
542 return NotImplemented
544 return (
545 self._backend._lib.EVP_PKEY_cmp(self._evp_pkey, other._evp_pkey)
546 == 1
547 )
549 def encrypt(self, plaintext: bytes, padding: AsymmetricPadding) -> bytes:
550 return _enc_dec_rsa(self._backend, self, plaintext, padding)
552 def public_numbers(self) -> RSAPublicNumbers:
553 n = self._backend._ffi.new("BIGNUM **")
554 e = self._backend._ffi.new("BIGNUM **")
555 self._backend._lib.RSA_get0_key(
556 self._rsa_cdata, n, e, self._backend._ffi.NULL
557 )
558 self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
559 self._backend.openssl_assert(e[0] != self._backend._ffi.NULL)
560 return RSAPublicNumbers(
561 e=self._backend._bn_to_int(e[0]),
562 n=self._backend._bn_to_int(n[0]),
563 )
565 def public_bytes(
566 self,
567 encoding: serialization.Encoding,
568 format: serialization.PublicFormat,
569 ) -> bytes:
570 return self._backend._public_key_bytes(
571 encoding, format, self, self._evp_pkey, self._rsa_cdata
572 )
574 def verify(
575 self,
576 signature: bytes,
577 data: bytes,
578 padding: AsymmetricPadding,
579 algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
580 ) -> None:
581 data, algorithm = _calculate_digest_and_algorithm(data, algorithm)
582 _rsa_sig_verify(
583 self._backend, padding, algorithm, self, signature, data
584 )
586 def recover_data_from_signature(
587 self,
588 signature: bytes,
589 padding: AsymmetricPadding,
590 algorithm: typing.Optional[hashes.HashAlgorithm],
591 ) -> bytes:
592 if isinstance(algorithm, asym_utils.Prehashed):
593 raise TypeError(
594 "Prehashed is only supported in the sign and verify methods. "
595 "It cannot be used with recover_data_from_signature."
596 )
597 return _rsa_sig_recover(
598 self._backend, padding, algorithm, self, signature
599 )