Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/dh.py: 17%
173 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5import typing
7from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
8from cryptography.hazmat.primitives import serialization
9from cryptography.hazmat.primitives.asymmetric import dh
11if typing.TYPE_CHECKING:
12 from cryptography.hazmat.backends.openssl.backend import Backend
15def _dh_params_dup(dh_cdata, backend: "Backend"):
16 lib = backend._lib
17 ffi = backend._ffi
19 param_cdata = lib.DHparams_dup(dh_cdata)
20 backend.openssl_assert(param_cdata != ffi.NULL)
21 param_cdata = ffi.gc(param_cdata, lib.DH_free)
22 if lib.CRYPTOGRAPHY_IS_LIBRESSL:
23 # In libressl DHparams_dup don't copy q
24 q = ffi.new("BIGNUM **")
25 lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
26 q_dup = lib.BN_dup(q[0])
27 res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
28 backend.openssl_assert(res == 1)
30 return param_cdata
33def _dh_cdata_to_parameters(dh_cdata, backend: "Backend") -> "_DHParameters":
34 param_cdata = _dh_params_dup(dh_cdata, backend)
35 return _DHParameters(backend, param_cdata)
38class _DHParameters(dh.DHParameters):
39 def __init__(self, backend: "Backend", dh_cdata):
40 self._backend = backend
41 self._dh_cdata = dh_cdata
43 def parameter_numbers(self) -> dh.DHParameterNumbers:
44 p = self._backend._ffi.new("BIGNUM **")
45 g = self._backend._ffi.new("BIGNUM **")
46 q = self._backend._ffi.new("BIGNUM **")
47 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
48 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
49 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
50 q_val: typing.Optional[int]
51 if q[0] == self._backend._ffi.NULL:
52 q_val = None
53 else:
54 q_val = self._backend._bn_to_int(q[0])
55 return dh.DHParameterNumbers(
56 p=self._backend._bn_to_int(p[0]),
57 g=self._backend._bn_to_int(g[0]),
58 q=q_val,
59 )
61 def generate_private_key(self) -> dh.DHPrivateKey:
62 return self._backend.generate_dh_private_key(self)
64 def parameter_bytes(
65 self,
66 encoding: serialization.Encoding,
67 format: serialization.ParameterFormat,
68 ) -> bytes:
69 if encoding is serialization.Encoding.OpenSSH:
70 raise TypeError("OpenSSH encoding is not supported")
72 if format is not serialization.ParameterFormat.PKCS3:
73 raise ValueError("Only PKCS3 serialization is supported")
75 q = self._backend._ffi.new("BIGNUM **")
76 self._backend._lib.DH_get0_pqg(
77 self._dh_cdata, self._backend._ffi.NULL, q, self._backend._ffi.NULL
78 )
79 if (
80 q[0] != self._backend._ffi.NULL
81 and not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX
82 ):
83 raise UnsupportedAlgorithm(
84 "DH X9.42 serialization is not supported",
85 _Reasons.UNSUPPORTED_SERIALIZATION,
86 )
88 if encoding is serialization.Encoding.PEM:
89 if q[0] != self._backend._ffi.NULL:
90 write_bio = self._backend._lib.PEM_write_bio_DHxparams
91 else:
92 write_bio = self._backend._lib.PEM_write_bio_DHparams
93 elif encoding is serialization.Encoding.DER:
94 if q[0] != self._backend._ffi.NULL:
95 write_bio = self._backend._lib.i2d_DHxparams_bio
96 else:
97 write_bio = self._backend._lib.i2d_DHparams_bio
98 else:
99 raise TypeError("encoding must be an item from the Encoding enum")
101 bio = self._backend._create_mem_bio_gc()
102 res = write_bio(bio, self._dh_cdata)
103 self._backend.openssl_assert(res == 1)
104 return self._backend._read_mem_bio(bio)
107def _get_dh_num_bits(backend, dh_cdata) -> int:
108 p = backend._ffi.new("BIGNUM **")
109 backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL)
110 backend.openssl_assert(p[0] != backend._ffi.NULL)
111 return backend._lib.BN_num_bits(p[0])
114class _DHPrivateKey(dh.DHPrivateKey):
115 def __init__(self, backend: "Backend", dh_cdata, evp_pkey):
116 self._backend = backend
117 self._dh_cdata = dh_cdata
118 self._evp_pkey = evp_pkey
119 self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
121 @property
122 def key_size(self) -> int:
123 return _get_dh_num_bits(self._backend, self._dh_cdata)
125 def private_numbers(self) -> dh.DHPrivateNumbers:
126 p = self._backend._ffi.new("BIGNUM **")
127 g = self._backend._ffi.new("BIGNUM **")
128 q = self._backend._ffi.new("BIGNUM **")
129 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
130 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
131 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
132 if q[0] == self._backend._ffi.NULL:
133 q_val = None
134 else:
135 q_val = self._backend._bn_to_int(q[0])
136 pub_key = self._backend._ffi.new("BIGNUM **")
137 priv_key = self._backend._ffi.new("BIGNUM **")
138 self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
139 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
140 self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
141 return dh.DHPrivateNumbers(
142 public_numbers=dh.DHPublicNumbers(
143 parameter_numbers=dh.DHParameterNumbers(
144 p=self._backend._bn_to_int(p[0]),
145 g=self._backend._bn_to_int(g[0]),
146 q=q_val,
147 ),
148 y=self._backend._bn_to_int(pub_key[0]),
149 ),
150 x=self._backend._bn_to_int(priv_key[0]),
151 )
153 def exchange(self, peer_public_key: dh.DHPublicKey) -> bytes:
154 if not isinstance(peer_public_key, _DHPublicKey):
155 raise TypeError("peer_public_key must be a DHPublicKey")
157 ctx = self._backend._lib.EVP_PKEY_CTX_new(
158 self._evp_pkey, self._backend._ffi.NULL
159 )
160 self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
161 ctx = self._backend._ffi.gc(ctx, self._backend._lib.EVP_PKEY_CTX_free)
162 res = self._backend._lib.EVP_PKEY_derive_init(ctx)
163 self._backend.openssl_assert(res == 1)
164 res = self._backend._lib.EVP_PKEY_derive_set_peer(
165 ctx, peer_public_key._evp_pkey
166 )
167 # Invalid kex errors here in OpenSSL 3.0 because checks were moved
168 # to EVP_PKEY_derive_set_peer
169 self._exchange_assert(res == 1)
170 keylen = self._backend._ffi.new("size_t *")
171 res = self._backend._lib.EVP_PKEY_derive(
172 ctx, self._backend._ffi.NULL, keylen
173 )
174 # Invalid kex errors here in OpenSSL < 3
175 self._exchange_assert(res == 1)
176 self._backend.openssl_assert(keylen[0] > 0)
177 buf = self._backend._ffi.new("unsigned char[]", keylen[0])
178 res = self._backend._lib.EVP_PKEY_derive(ctx, buf, keylen)
179 self._backend.openssl_assert(res == 1)
181 key = self._backend._ffi.buffer(buf, keylen[0])[:]
182 pad = self._key_size_bytes - len(key)
184 if pad > 0:
185 key = (b"\x00" * pad) + key
187 return key
189 def _exchange_assert(self, ok: bool) -> None:
190 if not ok:
191 errors = self._backend._consume_errors()
192 raise ValueError(
193 "Error computing shared key.",
194 errors,
195 )
197 def public_key(self) -> dh.DHPublicKey:
198 dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
199 pub_key = self._backend._ffi.new("BIGNUM **")
200 self._backend._lib.DH_get0_key(
201 self._dh_cdata, pub_key, self._backend._ffi.NULL
202 )
203 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
204 pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
205 self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
207 res = self._backend._lib.DH_set0_key(
208 dh_cdata, pub_key_dup, self._backend._ffi.NULL
209 )
210 self._backend.openssl_assert(res == 1)
211 evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
212 return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
214 def parameters(self) -> dh.DHParameters:
215 return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
217 def private_bytes(
218 self,
219 encoding: serialization.Encoding,
220 format: serialization.PrivateFormat,
221 encryption_algorithm: serialization.KeySerializationEncryption,
222 ) -> bytes:
223 if format is not serialization.PrivateFormat.PKCS8:
224 raise ValueError(
225 "DH private keys support only PKCS8 serialization"
226 )
227 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
228 q = self._backend._ffi.new("BIGNUM **")
229 self._backend._lib.DH_get0_pqg(
230 self._dh_cdata,
231 self._backend._ffi.NULL,
232 q,
233 self._backend._ffi.NULL,
234 )
235 if q[0] != self._backend._ffi.NULL:
236 raise UnsupportedAlgorithm(
237 "DH X9.42 serialization is not supported",
238 _Reasons.UNSUPPORTED_SERIALIZATION,
239 )
241 return self._backend._private_key_bytes(
242 encoding,
243 format,
244 encryption_algorithm,
245 self,
246 self._evp_pkey,
247 self._dh_cdata,
248 )
251class _DHPublicKey(dh.DHPublicKey):
252 def __init__(self, backend: "Backend", dh_cdata, evp_pkey):
253 self._backend = backend
254 self._dh_cdata = dh_cdata
255 self._evp_pkey = evp_pkey
256 self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
258 @property
259 def key_size(self) -> int:
260 return self._key_size_bits
262 def public_numbers(self) -> dh.DHPublicNumbers:
263 p = self._backend._ffi.new("BIGNUM **")
264 g = self._backend._ffi.new("BIGNUM **")
265 q = self._backend._ffi.new("BIGNUM **")
266 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
267 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
268 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
269 if q[0] == self._backend._ffi.NULL:
270 q_val = None
271 else:
272 q_val = self._backend._bn_to_int(q[0])
273 pub_key = self._backend._ffi.new("BIGNUM **")
274 self._backend._lib.DH_get0_key(
275 self._dh_cdata, pub_key, self._backend._ffi.NULL
276 )
277 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
278 return dh.DHPublicNumbers(
279 parameter_numbers=dh.DHParameterNumbers(
280 p=self._backend._bn_to_int(p[0]),
281 g=self._backend._bn_to_int(g[0]),
282 q=q_val,
283 ),
284 y=self._backend._bn_to_int(pub_key[0]),
285 )
287 def parameters(self) -> dh.DHParameters:
288 return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
290 def public_bytes(
291 self,
292 encoding: serialization.Encoding,
293 format: serialization.PublicFormat,
294 ) -> bytes:
295 if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
296 raise ValueError(
297 "DH public keys support only "
298 "SubjectPublicKeyInfo serialization"
299 )
301 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
302 q = self._backend._ffi.new("BIGNUM **")
303 self._backend._lib.DH_get0_pqg(
304 self._dh_cdata,
305 self._backend._ffi.NULL,
306 q,
307 self._backend._ffi.NULL,
308 )
309 if q[0] != self._backend._ffi.NULL:
310 raise UnsupportedAlgorithm(
311 "DH X9.42 serialization is not supported",
312 _Reasons.UNSUPPORTED_SERIALIZATION,
313 )
315 return self._backend._public_key_bytes(
316 encoding, format, self, self._evp_pkey, None
317 )