Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/rsa.py: 17%
257 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5import threading
6import typing
8from cryptography.exceptions import (
9 InvalidSignature,
10 UnsupportedAlgorithm,
11 _Reasons,
12)
13from cryptography.hazmat.backends.openssl.utils import (
14 _calculate_digest_and_algorithm,
15)
16from cryptography.hazmat.primitives import hashes, serialization
17from cryptography.hazmat.primitives.asymmetric import (
18 utils as asym_utils,
19)
20from cryptography.hazmat.primitives.asymmetric.padding import (
21 AsymmetricPadding,
22 MGF1,
23 OAEP,
24 PKCS1v15,
25 PSS,
26 _Auto,
27 _DigestLength,
28 _MaxLength,
29 calculate_max_pss_salt_length,
30)
31from cryptography.hazmat.primitives.asymmetric.rsa import (
32 RSAPrivateKey,
33 RSAPrivateNumbers,
34 RSAPublicKey,
35 RSAPublicNumbers,
36)
39if typing.TYPE_CHECKING:
40 from cryptography.hazmat.backends.openssl.backend import Backend
43def _get_rsa_pss_salt_length(
44 backend: "Backend",
45 pss: PSS,
46 key: typing.Union[RSAPrivateKey, RSAPublicKey],
47 hash_algorithm: hashes.HashAlgorithm,
48) -> int:
49 salt = pss._salt_length
51 if isinstance(salt, _MaxLength):
52 return calculate_max_pss_salt_length(key, hash_algorithm)
53 elif isinstance(salt, _DigestLength):
54 return hash_algorithm.digest_size
55 elif isinstance(salt, _Auto):
56 if isinstance(key, RSAPrivateKey):
57 raise ValueError(
58 "PSS salt length can only be set to AUTO when verifying"
59 )
60 return backend._lib.RSA_PSS_SALTLEN_AUTO
61 else:
62 return salt
65def _enc_dec_rsa(
66 backend: "Backend",
67 key: typing.Union["_RSAPrivateKey", "_RSAPublicKey"],
68 data: bytes,
69 padding: AsymmetricPadding,
70) -> bytes:
71 if not isinstance(padding, AsymmetricPadding):
72 raise TypeError("Padding must be an instance of AsymmetricPadding.")
74 if isinstance(padding, PKCS1v15):
75 padding_enum = backend._lib.RSA_PKCS1_PADDING
76 elif isinstance(padding, OAEP):
77 padding_enum = backend._lib.RSA_PKCS1_OAEP_PADDING
79 if not isinstance(padding._mgf, MGF1):
80 raise UnsupportedAlgorithm(
81 "Only MGF1 is supported by this backend.",
82 _Reasons.UNSUPPORTED_MGF,
83 )
85 if not backend.rsa_padding_supported(padding):
86 raise UnsupportedAlgorithm(
87 "This combination of padding and hash algorithm is not "
88 "supported by this backend.",
89 _Reasons.UNSUPPORTED_PADDING,
90 )
92 else:
93 raise UnsupportedAlgorithm(
94 "{} is not supported by this backend.".format(padding.name),
95 _Reasons.UNSUPPORTED_PADDING,
96 )
98 return _enc_dec_rsa_pkey_ctx(backend, key, data, padding_enum, padding)
101def _enc_dec_rsa_pkey_ctx(
102 backend: "Backend",
103 key: typing.Union["_RSAPrivateKey", "_RSAPublicKey"],
104 data: bytes,
105 padding_enum: int,
106 padding: AsymmetricPadding,
107) -> bytes:
108 init: typing.Callable[[typing.Any], int]
109 crypt: typing.Callable[[typing.Any, typing.Any, int, bytes, int], int]
110 if isinstance(key, _RSAPublicKey):
111 init = backend._lib.EVP_PKEY_encrypt_init
112 crypt = backend._lib.EVP_PKEY_encrypt
113 else:
114 init = backend._lib.EVP_PKEY_decrypt_init
115 crypt = backend._lib.EVP_PKEY_decrypt
117 pkey_ctx = backend._lib.EVP_PKEY_CTX_new(key._evp_pkey, backend._ffi.NULL)
118 backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
119 pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
120 res = init(pkey_ctx)
121 backend.openssl_assert(res == 1)
122 res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(pkey_ctx, padding_enum)
123 backend.openssl_assert(res > 0)
124 buf_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
125 backend.openssl_assert(buf_size > 0)
126 if isinstance(padding, OAEP):
127 mgf1_md = backend._evp_md_non_null_from_algorithm(
128 padding._mgf._algorithm
129 )
130 res = backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(pkey_ctx, mgf1_md)
131 backend.openssl_assert(res > 0)
132 oaep_md = backend._evp_md_non_null_from_algorithm(padding._algorithm)
133 res = backend._lib.EVP_PKEY_CTX_set_rsa_oaep_md(pkey_ctx, oaep_md)
134 backend.openssl_assert(res > 0)
136 if (
137 isinstance(padding, OAEP)
138 and padding._label is not None
139 and len(padding._label) > 0
140 ):
141 # set0_rsa_oaep_label takes ownership of the char * so we need to
142 # copy it into some new memory
143 labelptr = backend._lib.OPENSSL_malloc(len(padding._label))
144 backend.openssl_assert(labelptr != backend._ffi.NULL)
145 backend._ffi.memmove(labelptr, padding._label, len(padding._label))
146 res = backend._lib.EVP_PKEY_CTX_set0_rsa_oaep_label(
147 pkey_ctx, labelptr, len(padding._label)
148 )
149 backend.openssl_assert(res == 1)
151 outlen = backend._ffi.new("size_t *", buf_size)
152 buf = backend._ffi.new("unsigned char[]", buf_size)
153 # Everything from this line onwards is written with the goal of being as
154 # constant-time as is practical given the constraints of Python and our
155 # API. See Bleichenbacher's '98 attack on RSA, and its many many variants.
156 # As such, you should not attempt to change this (particularly to "clean it
157 # up") without understanding why it was written this way (see
158 # Chesterton's Fence), and without measuring to verify you have not
159 # introduced observable time differences.
160 res = crypt(pkey_ctx, buf, outlen, data, len(data))
161 resbuf = backend._ffi.buffer(buf)[: outlen[0]]
162 backend._lib.ERR_clear_error()
163 if res <= 0:
164 raise ValueError("Encryption/decryption failed.")
165 return resbuf
168def _rsa_sig_determine_padding(
169 backend: "Backend",
170 key: typing.Union["_RSAPrivateKey", "_RSAPublicKey"],
171 padding: AsymmetricPadding,
172 algorithm: typing.Optional[hashes.HashAlgorithm],
173) -> int:
174 if not isinstance(padding, AsymmetricPadding):
175 raise TypeError("Expected provider of AsymmetricPadding.")
177 pkey_size = backend._lib.EVP_PKEY_size(key._evp_pkey)
178 backend.openssl_assert(pkey_size > 0)
180 if isinstance(padding, PKCS1v15):
181 # Hash algorithm is ignored for PKCS1v15-padding, may be None.
182 padding_enum = backend._lib.RSA_PKCS1_PADDING
183 elif isinstance(padding, PSS):
184 if not isinstance(padding._mgf, MGF1):
185 raise UnsupportedAlgorithm(
186 "Only MGF1 is supported by this backend.",
187 _Reasons.UNSUPPORTED_MGF,
188 )
190 # PSS padding requires a hash algorithm
191 if not isinstance(algorithm, hashes.HashAlgorithm):
192 raise TypeError("Expected instance of hashes.HashAlgorithm.")
194 # Size of key in bytes - 2 is the maximum
195 # PSS signature length (salt length is checked later)
196 if pkey_size - algorithm.digest_size - 2 < 0:
197 raise ValueError(
198 "Digest too large for key size. Use a larger "
199 "key or different digest."
200 )
202 padding_enum = backend._lib.RSA_PKCS1_PSS_PADDING
203 else:
204 raise UnsupportedAlgorithm(
205 "{} is not supported by this backend.".format(padding.name),
206 _Reasons.UNSUPPORTED_PADDING,
207 )
209 return padding_enum
212# Hash algorithm can be absent (None) to initialize the context without setting
213# any message digest algorithm. This is currently only valid for the PKCS1v15
214# padding type, where it means that the signature data is encoded/decoded
215# as provided, without being wrapped in a DigestInfo structure.
216def _rsa_sig_setup(
217 backend: "Backend",
218 padding: AsymmetricPadding,
219 algorithm: typing.Optional[hashes.HashAlgorithm],
220 key: typing.Union["_RSAPublicKey", "_RSAPrivateKey"],
221 init_func: typing.Callable[[typing.Any], int],
222):
223 padding_enum = _rsa_sig_determine_padding(backend, key, padding, algorithm)
224 pkey_ctx = backend._lib.EVP_PKEY_CTX_new(key._evp_pkey, backend._ffi.NULL)
225 backend.openssl_assert(pkey_ctx != backend._ffi.NULL)
226 pkey_ctx = backend._ffi.gc(pkey_ctx, backend._lib.EVP_PKEY_CTX_free)
227 res = init_func(pkey_ctx)
228 if res != 1:
229 errors = backend._consume_errors()
230 raise ValueError("Unable to sign/verify with this key", errors)
232 if algorithm is not None:
233 evp_md = backend._evp_md_non_null_from_algorithm(algorithm)
234 res = backend._lib.EVP_PKEY_CTX_set_signature_md(pkey_ctx, evp_md)
235 if res <= 0:
236 backend._consume_errors()
237 raise UnsupportedAlgorithm(
238 "{} is not supported by this backend for RSA signing.".format(
239 algorithm.name
240 ),
241 _Reasons.UNSUPPORTED_HASH,
242 )
243 res = backend._lib.EVP_PKEY_CTX_set_rsa_padding(pkey_ctx, padding_enum)
244 if res <= 0:
245 backend._consume_errors()
246 raise UnsupportedAlgorithm(
247 "{} is not supported for the RSA signature operation.".format(
248 padding.name
249 ),
250 _Reasons.UNSUPPORTED_PADDING,
251 )
252 if isinstance(padding, PSS):
253 assert isinstance(algorithm, hashes.HashAlgorithm)
254 res = backend._lib.EVP_PKEY_CTX_set_rsa_pss_saltlen(
255 pkey_ctx,
256 _get_rsa_pss_salt_length(backend, padding, key, algorithm),
257 )
258 backend.openssl_assert(res > 0)
260 mgf1_md = backend._evp_md_non_null_from_algorithm(
261 padding._mgf._algorithm
262 )
263 res = backend._lib.EVP_PKEY_CTX_set_rsa_mgf1_md(pkey_ctx, mgf1_md)
264 backend.openssl_assert(res > 0)
266 return pkey_ctx
269def _rsa_sig_sign(
270 backend: "Backend",
271 padding: AsymmetricPadding,
272 algorithm: hashes.HashAlgorithm,
273 private_key: "_RSAPrivateKey",
274 data: bytes,
275) -> bytes:
276 pkey_ctx = _rsa_sig_setup(
277 backend,
278 padding,
279 algorithm,
280 private_key,
281 backend._lib.EVP_PKEY_sign_init,
282 )
283 buflen = backend._ffi.new("size_t *")
284 res = backend._lib.EVP_PKEY_sign(
285 pkey_ctx, backend._ffi.NULL, buflen, data, len(data)
286 )
287 backend.openssl_assert(res == 1)
288 buf = backend._ffi.new("unsigned char[]", buflen[0])
289 res = backend._lib.EVP_PKEY_sign(pkey_ctx, buf, buflen, data, len(data))
290 if res != 1:
291 errors = backend._consume_errors_with_text()
292 raise ValueError(
293 "Digest or salt length too long for key size. Use a larger key "
294 "or shorter salt length if you are specifying a PSS salt",
295 errors,
296 )
298 return backend._ffi.buffer(buf)[:]
301def _rsa_sig_verify(
302 backend: "Backend",
303 padding: AsymmetricPadding,
304 algorithm: hashes.HashAlgorithm,
305 public_key: "_RSAPublicKey",
306 signature: bytes,
307 data: bytes,
308) -> None:
309 pkey_ctx = _rsa_sig_setup(
310 backend,
311 padding,
312 algorithm,
313 public_key,
314 backend._lib.EVP_PKEY_verify_init,
315 )
316 res = backend._lib.EVP_PKEY_verify(
317 pkey_ctx, signature, len(signature), data, len(data)
318 )
319 # The previous call can return negative numbers in the event of an
320 # error. This is not a signature failure but we need to fail if it
321 # occurs.
322 backend.openssl_assert(res >= 0)
323 if res == 0:
324 backend._consume_errors()
325 raise InvalidSignature
328def _rsa_sig_recover(
329 backend: "Backend",
330 padding: AsymmetricPadding,
331 algorithm: typing.Optional[hashes.HashAlgorithm],
332 public_key: "_RSAPublicKey",
333 signature: bytes,
334) -> bytes:
335 pkey_ctx = _rsa_sig_setup(
336 backend,
337 padding,
338 algorithm,
339 public_key,
340 backend._lib.EVP_PKEY_verify_recover_init,
341 )
343 # Attempt to keep the rest of the code in this function as constant/time
344 # as possible. See the comment in _enc_dec_rsa_pkey_ctx. Note that the
345 # buflen parameter is used even though its value may be undefined in the
346 # error case. Due to the tolerant nature of Python slicing this does not
347 # trigger any exceptions.
348 maxlen = backend._lib.EVP_PKEY_size(public_key._evp_pkey)
349 backend.openssl_assert(maxlen > 0)
350 buf = backend._ffi.new("unsigned char[]", maxlen)
351 buflen = backend._ffi.new("size_t *", maxlen)
352 res = backend._lib.EVP_PKEY_verify_recover(
353 pkey_ctx, buf, buflen, signature, len(signature)
354 )
355 resbuf = backend._ffi.buffer(buf)[: buflen[0]]
356 backend._lib.ERR_clear_error()
357 # Assume that all parameter errors are handled during the setup phase and
358 # any error here is due to invalid signature.
359 if res != 1:
360 raise InvalidSignature
361 return resbuf
364class _RSAPrivateKey(RSAPrivateKey):
365 _evp_pkey: object
366 _rsa_cdata: object
367 _key_size: int
369 def __init__(
370 self, backend: "Backend", rsa_cdata, evp_pkey, _skip_check_key: bool
371 ):
372 res: int
373 # RSA_check_key is slower in OpenSSL 3.0.0 due to improved
374 # primality checking. In normal use this is unlikely to be a problem
375 # since users don't load new keys constantly, but for TESTING we've
376 # added an init arg that allows skipping the checks. You should not
377 # use this in production code unless you understand the consequences.
378 if not _skip_check_key:
379 res = backend._lib.RSA_check_key(rsa_cdata)
380 if res != 1:
381 errors = backend._consume_errors_with_text()
382 raise ValueError("Invalid private key", errors)
383 # 2 is prime and passes an RSA key check, so we also check
384 # if p and q are odd just to be safe.
385 p = backend._ffi.new("BIGNUM **")
386 q = backend._ffi.new("BIGNUM **")
387 backend._lib.RSA_get0_factors(rsa_cdata, p, q)
388 backend.openssl_assert(p[0] != backend._ffi.NULL)
389 backend.openssl_assert(q[0] != backend._ffi.NULL)
390 p_odd = backend._lib.BN_is_odd(p[0])
391 q_odd = backend._lib.BN_is_odd(q[0])
392 if p_odd != 1 or q_odd != 1:
393 errors = backend._consume_errors_with_text()
394 raise ValueError("Invalid private key", errors)
396 self._backend = backend
397 self._rsa_cdata = rsa_cdata
398 self._evp_pkey = evp_pkey
399 # Used for lazy blinding
400 self._blinded = False
401 self._blinding_lock = threading.Lock()
403 n = self._backend._ffi.new("BIGNUM **")
404 self._backend._lib.RSA_get0_key(
405 self._rsa_cdata,
406 n,
407 self._backend._ffi.NULL,
408 self._backend._ffi.NULL,
409 )
410 self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
411 self._key_size = self._backend._lib.BN_num_bits(n[0])
413 def _enable_blinding(self) -> None:
414 # If you call blind on an already blinded RSA key OpenSSL will turn
415 # it off and back on, which is a performance hit we want to avoid.
416 if not self._blinded:
417 with self._blinding_lock:
418 self._non_threadsafe_enable_blinding()
420 def _non_threadsafe_enable_blinding(self) -> None:
421 # This is only a separate function to allow for testing to cover both
422 # branches. It should never be invoked except through _enable_blinding.
423 # Check if it's not True again in case another thread raced past the
424 # first non-locked check.
425 if not self._blinded:
426 res = self._backend._lib.RSA_blinding_on(
427 self._rsa_cdata, self._backend._ffi.NULL
428 )
429 self._backend.openssl_assert(res == 1)
430 self._blinded = True
432 @property
433 def key_size(self) -> int:
434 return self._key_size
436 def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes:
437 self._enable_blinding()
438 key_size_bytes = (self.key_size + 7) // 8
439 if key_size_bytes != len(ciphertext):
440 raise ValueError("Ciphertext length must be equal to key size.")
442 return _enc_dec_rsa(self._backend, self, ciphertext, padding)
444 def public_key(self) -> RSAPublicKey:
445 ctx = self._backend._lib.RSAPublicKey_dup(self._rsa_cdata)
446 self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
447 ctx = self._backend._ffi.gc(ctx, self._backend._lib.RSA_free)
448 evp_pkey = self._backend._rsa_cdata_to_evp_pkey(ctx)
449 return _RSAPublicKey(self._backend, ctx, evp_pkey)
451 def private_numbers(self) -> RSAPrivateNumbers:
452 n = self._backend._ffi.new("BIGNUM **")
453 e = self._backend._ffi.new("BIGNUM **")
454 d = self._backend._ffi.new("BIGNUM **")
455 p = self._backend._ffi.new("BIGNUM **")
456 q = self._backend._ffi.new("BIGNUM **")
457 dmp1 = self._backend._ffi.new("BIGNUM **")
458 dmq1 = self._backend._ffi.new("BIGNUM **")
459 iqmp = self._backend._ffi.new("BIGNUM **")
460 self._backend._lib.RSA_get0_key(self._rsa_cdata, n, e, d)
461 self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
462 self._backend.openssl_assert(e[0] != self._backend._ffi.NULL)
463 self._backend.openssl_assert(d[0] != self._backend._ffi.NULL)
464 self._backend._lib.RSA_get0_factors(self._rsa_cdata, p, q)
465 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
466 self._backend.openssl_assert(q[0] != self._backend._ffi.NULL)
467 self._backend._lib.RSA_get0_crt_params(
468 self._rsa_cdata, dmp1, dmq1, iqmp
469 )
470 self._backend.openssl_assert(dmp1[0] != self._backend._ffi.NULL)
471 self._backend.openssl_assert(dmq1[0] != self._backend._ffi.NULL)
472 self._backend.openssl_assert(iqmp[0] != self._backend._ffi.NULL)
473 return RSAPrivateNumbers(
474 p=self._backend._bn_to_int(p[0]),
475 q=self._backend._bn_to_int(q[0]),
476 d=self._backend._bn_to_int(d[0]),
477 dmp1=self._backend._bn_to_int(dmp1[0]),
478 dmq1=self._backend._bn_to_int(dmq1[0]),
479 iqmp=self._backend._bn_to_int(iqmp[0]),
480 public_numbers=RSAPublicNumbers(
481 e=self._backend._bn_to_int(e[0]),
482 n=self._backend._bn_to_int(n[0]),
483 ),
484 )
486 def private_bytes(
487 self,
488 encoding: serialization.Encoding,
489 format: serialization.PrivateFormat,
490 encryption_algorithm: serialization.KeySerializationEncryption,
491 ) -> bytes:
492 return self._backend._private_key_bytes(
493 encoding,
494 format,
495 encryption_algorithm,
496 self,
497 self._evp_pkey,
498 self._rsa_cdata,
499 )
501 def sign(
502 self,
503 data: bytes,
504 padding: AsymmetricPadding,
505 algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
506 ) -> bytes:
507 self._enable_blinding()
508 data, algorithm = _calculate_digest_and_algorithm(data, algorithm)
509 return _rsa_sig_sign(self._backend, padding, algorithm, self, data)
512class _RSAPublicKey(RSAPublicKey):
513 _evp_pkey: object
514 _rsa_cdata: object
515 _key_size: int
517 def __init__(self, backend: "Backend", rsa_cdata, evp_pkey):
518 self._backend = backend
519 self._rsa_cdata = rsa_cdata
520 self._evp_pkey = evp_pkey
522 n = self._backend._ffi.new("BIGNUM **")
523 self._backend._lib.RSA_get0_key(
524 self._rsa_cdata,
525 n,
526 self._backend._ffi.NULL,
527 self._backend._ffi.NULL,
528 )
529 self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
530 self._key_size = self._backend._lib.BN_num_bits(n[0])
532 @property
533 def key_size(self) -> int:
534 return self._key_size
536 def encrypt(self, plaintext: bytes, padding: AsymmetricPadding) -> bytes:
537 return _enc_dec_rsa(self._backend, self, plaintext, padding)
539 def public_numbers(self) -> RSAPublicNumbers:
540 n = self._backend._ffi.new("BIGNUM **")
541 e = self._backend._ffi.new("BIGNUM **")
542 self._backend._lib.RSA_get0_key(
543 self._rsa_cdata, n, e, self._backend._ffi.NULL
544 )
545 self._backend.openssl_assert(n[0] != self._backend._ffi.NULL)
546 self._backend.openssl_assert(e[0] != self._backend._ffi.NULL)
547 return RSAPublicNumbers(
548 e=self._backend._bn_to_int(e[0]),
549 n=self._backend._bn_to_int(n[0]),
550 )
552 def public_bytes(
553 self,
554 encoding: serialization.Encoding,
555 format: serialization.PublicFormat,
556 ) -> bytes:
557 return self._backend._public_key_bytes(
558 encoding, format, self, self._evp_pkey, self._rsa_cdata
559 )
561 def verify(
562 self,
563 signature: bytes,
564 data: bytes,
565 padding: AsymmetricPadding,
566 algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm],
567 ) -> None:
568 data, algorithm = _calculate_digest_and_algorithm(data, algorithm)
569 _rsa_sig_verify(
570 self._backend, padding, algorithm, self, signature, data
571 )
573 def recover_data_from_signature(
574 self,
575 signature: bytes,
576 padding: AsymmetricPadding,
577 algorithm: typing.Optional[hashes.HashAlgorithm],
578 ) -> bytes:
579 if isinstance(algorithm, asym_utils.Prehashed):
580 raise TypeError(
581 "Prehashed is only supported in the sign and verify methods. "
582 "It cannot be used with recover_data_from_signature."
583 )
584 return _rsa_sig_recover(
585 self._backend, padding, algorithm, self, signature
586 )