Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/dh.py: 17%

173 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:36 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5import typing 

6 

7from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 

8from cryptography.hazmat.primitives import serialization 

9from cryptography.hazmat.primitives.asymmetric import dh 

10 

11if typing.TYPE_CHECKING: 

12 from cryptography.hazmat.backends.openssl.backend import Backend 

13 

14 

15def _dh_params_dup(dh_cdata, backend: "Backend"): 

16 lib = backend._lib 

17 ffi = backend._ffi 

18 

19 param_cdata = lib.DHparams_dup(dh_cdata) 

20 backend.openssl_assert(param_cdata != ffi.NULL) 

21 param_cdata = ffi.gc(param_cdata, lib.DH_free) 

22 if lib.CRYPTOGRAPHY_IS_LIBRESSL: 

23 # In libressl DHparams_dup don't copy q 

24 q = ffi.new("BIGNUM **") 

25 lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL) 

26 q_dup = lib.BN_dup(q[0]) 

27 res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL) 

28 backend.openssl_assert(res == 1) 

29 

30 return param_cdata 

31 

32 

33def _dh_cdata_to_parameters(dh_cdata, backend: "Backend") -> "_DHParameters": 

34 param_cdata = _dh_params_dup(dh_cdata, backend) 

35 return _DHParameters(backend, param_cdata) 

36 

37 

38class _DHParameters(dh.DHParameters): 

39 def __init__(self, backend: "Backend", dh_cdata): 

40 self._backend = backend 

41 self._dh_cdata = dh_cdata 

42 

43 def parameter_numbers(self) -> dh.DHParameterNumbers: 

44 p = self._backend._ffi.new("BIGNUM **") 

45 g = self._backend._ffi.new("BIGNUM **") 

46 q = self._backend._ffi.new("BIGNUM **") 

47 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 

48 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 

49 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 

50 q_val: typing.Optional[int] 

51 if q[0] == self._backend._ffi.NULL: 

52 q_val = None 

53 else: 

54 q_val = self._backend._bn_to_int(q[0]) 

55 return dh.DHParameterNumbers( 

56 p=self._backend._bn_to_int(p[0]), 

57 g=self._backend._bn_to_int(g[0]), 

58 q=q_val, 

59 ) 

60 

61 def generate_private_key(self) -> dh.DHPrivateKey: 

62 return self._backend.generate_dh_private_key(self) 

63 

64 def parameter_bytes( 

65 self, 

66 encoding: serialization.Encoding, 

67 format: serialization.ParameterFormat, 

68 ) -> bytes: 

69 if encoding is serialization.Encoding.OpenSSH: 

70 raise TypeError("OpenSSH encoding is not supported") 

71 

72 if format is not serialization.ParameterFormat.PKCS3: 

73 raise ValueError("Only PKCS3 serialization is supported") 

74 

75 q = self._backend._ffi.new("BIGNUM **") 

76 self._backend._lib.DH_get0_pqg( 

77 self._dh_cdata, self._backend._ffi.NULL, q, self._backend._ffi.NULL 

78 ) 

79 if ( 

80 q[0] != self._backend._ffi.NULL 

81 and not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX 

82 ): 

83 raise UnsupportedAlgorithm( 

84 "DH X9.42 serialization is not supported", 

85 _Reasons.UNSUPPORTED_SERIALIZATION, 

86 ) 

87 

88 if encoding is serialization.Encoding.PEM: 

89 if q[0] != self._backend._ffi.NULL: 

90 write_bio = self._backend._lib.PEM_write_bio_DHxparams 

91 else: 

92 write_bio = self._backend._lib.PEM_write_bio_DHparams 

93 elif encoding is serialization.Encoding.DER: 

94 if q[0] != self._backend._ffi.NULL: 

95 write_bio = self._backend._lib.i2d_DHxparams_bio 

96 else: 

97 write_bio = self._backend._lib.i2d_DHparams_bio 

98 else: 

99 raise TypeError("encoding must be an item from the Encoding enum") 

100 

101 bio = self._backend._create_mem_bio_gc() 

102 res = write_bio(bio, self._dh_cdata) 

103 self._backend.openssl_assert(res == 1) 

104 return self._backend._read_mem_bio(bio) 

105 

106 

107def _get_dh_num_bits(backend, dh_cdata) -> int: 

108 p = backend._ffi.new("BIGNUM **") 

109 backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL) 

110 backend.openssl_assert(p[0] != backend._ffi.NULL) 

111 return backend._lib.BN_num_bits(p[0]) 

112 

113 

114class _DHPrivateKey(dh.DHPrivateKey): 

115 def __init__(self, backend: "Backend", dh_cdata, evp_pkey): 

116 self._backend = backend 

117 self._dh_cdata = dh_cdata 

118 self._evp_pkey = evp_pkey 

119 self._key_size_bytes = self._backend._lib.DH_size(dh_cdata) 

120 

121 @property 

122 def key_size(self) -> int: 

123 return _get_dh_num_bits(self._backend, self._dh_cdata) 

124 

125 def private_numbers(self) -> dh.DHPrivateNumbers: 

126 p = self._backend._ffi.new("BIGNUM **") 

127 g = self._backend._ffi.new("BIGNUM **") 

128 q = self._backend._ffi.new("BIGNUM **") 

129 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 

130 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 

131 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 

132 if q[0] == self._backend._ffi.NULL: 

133 q_val = None 

134 else: 

135 q_val = self._backend._bn_to_int(q[0]) 

136 pub_key = self._backend._ffi.new("BIGNUM **") 

137 priv_key = self._backend._ffi.new("BIGNUM **") 

138 self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key) 

139 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 

140 self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL) 

141 return dh.DHPrivateNumbers( 

142 public_numbers=dh.DHPublicNumbers( 

143 parameter_numbers=dh.DHParameterNumbers( 

144 p=self._backend._bn_to_int(p[0]), 

145 g=self._backend._bn_to_int(g[0]), 

146 q=q_val, 

147 ), 

148 y=self._backend._bn_to_int(pub_key[0]), 

149 ), 

150 x=self._backend._bn_to_int(priv_key[0]), 

151 ) 

152 

153 def exchange(self, peer_public_key: dh.DHPublicKey) -> bytes: 

154 if not isinstance(peer_public_key, _DHPublicKey): 

155 raise TypeError("peer_public_key must be a DHPublicKey") 

156 

157 ctx = self._backend._lib.EVP_PKEY_CTX_new( 

158 self._evp_pkey, self._backend._ffi.NULL 

159 ) 

160 self._backend.openssl_assert(ctx != self._backend._ffi.NULL) 

161 ctx = self._backend._ffi.gc(ctx, self._backend._lib.EVP_PKEY_CTX_free) 

162 res = self._backend._lib.EVP_PKEY_derive_init(ctx) 

163 self._backend.openssl_assert(res == 1) 

164 res = self._backend._lib.EVP_PKEY_derive_set_peer( 

165 ctx, peer_public_key._evp_pkey 

166 ) 

167 # Invalid kex errors here in OpenSSL 3.0 because checks were moved 

168 # to EVP_PKEY_derive_set_peer 

169 self._exchange_assert(res == 1) 

170 keylen = self._backend._ffi.new("size_t *") 

171 res = self._backend._lib.EVP_PKEY_derive( 

172 ctx, self._backend._ffi.NULL, keylen 

173 ) 

174 # Invalid kex errors here in OpenSSL < 3 

175 self._exchange_assert(res == 1) 

176 self._backend.openssl_assert(keylen[0] > 0) 

177 buf = self._backend._ffi.new("unsigned char[]", keylen[0]) 

178 res = self._backend._lib.EVP_PKEY_derive(ctx, buf, keylen) 

179 self._backend.openssl_assert(res == 1) 

180 

181 key = self._backend._ffi.buffer(buf, keylen[0])[:] 

182 pad = self._key_size_bytes - len(key) 

183 

184 if pad > 0: 

185 key = (b"\x00" * pad) + key 

186 

187 return key 

188 

189 def _exchange_assert(self, ok: bool) -> None: 

190 if not ok: 

191 errors = self._backend._consume_errors() 

192 raise ValueError( 

193 "Error computing shared key.", 

194 errors, 

195 ) 

196 

197 def public_key(self) -> dh.DHPublicKey: 

198 dh_cdata = _dh_params_dup(self._dh_cdata, self._backend) 

199 pub_key = self._backend._ffi.new("BIGNUM **") 

200 self._backend._lib.DH_get0_key( 

201 self._dh_cdata, pub_key, self._backend._ffi.NULL 

202 ) 

203 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 

204 pub_key_dup = self._backend._lib.BN_dup(pub_key[0]) 

205 self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL) 

206 

207 res = self._backend._lib.DH_set0_key( 

208 dh_cdata, pub_key_dup, self._backend._ffi.NULL 

209 ) 

210 self._backend.openssl_assert(res == 1) 

211 evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata) 

212 return _DHPublicKey(self._backend, dh_cdata, evp_pkey) 

213 

214 def parameters(self) -> dh.DHParameters: 

215 return _dh_cdata_to_parameters(self._dh_cdata, self._backend) 

216 

217 def private_bytes( 

218 self, 

219 encoding: serialization.Encoding, 

220 format: serialization.PrivateFormat, 

221 encryption_algorithm: serialization.KeySerializationEncryption, 

222 ) -> bytes: 

223 if format is not serialization.PrivateFormat.PKCS8: 

224 raise ValueError( 

225 "DH private keys support only PKCS8 serialization" 

226 ) 

227 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX: 

228 q = self._backend._ffi.new("BIGNUM **") 

229 self._backend._lib.DH_get0_pqg( 

230 self._dh_cdata, 

231 self._backend._ffi.NULL, 

232 q, 

233 self._backend._ffi.NULL, 

234 ) 

235 if q[0] != self._backend._ffi.NULL: 

236 raise UnsupportedAlgorithm( 

237 "DH X9.42 serialization is not supported", 

238 _Reasons.UNSUPPORTED_SERIALIZATION, 

239 ) 

240 

241 return self._backend._private_key_bytes( 

242 encoding, 

243 format, 

244 encryption_algorithm, 

245 self, 

246 self._evp_pkey, 

247 self._dh_cdata, 

248 ) 

249 

250 

251class _DHPublicKey(dh.DHPublicKey): 

252 def __init__(self, backend: "Backend", dh_cdata, evp_pkey): 

253 self._backend = backend 

254 self._dh_cdata = dh_cdata 

255 self._evp_pkey = evp_pkey 

256 self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata) 

257 

258 @property 

259 def key_size(self) -> int: 

260 return self._key_size_bits 

261 

262 def public_numbers(self) -> dh.DHPublicNumbers: 

263 p = self._backend._ffi.new("BIGNUM **") 

264 g = self._backend._ffi.new("BIGNUM **") 

265 q = self._backend._ffi.new("BIGNUM **") 

266 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 

267 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 

268 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 

269 if q[0] == self._backend._ffi.NULL: 

270 q_val = None 

271 else: 

272 q_val = self._backend._bn_to_int(q[0]) 

273 pub_key = self._backend._ffi.new("BIGNUM **") 

274 self._backend._lib.DH_get0_key( 

275 self._dh_cdata, pub_key, self._backend._ffi.NULL 

276 ) 

277 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 

278 return dh.DHPublicNumbers( 

279 parameter_numbers=dh.DHParameterNumbers( 

280 p=self._backend._bn_to_int(p[0]), 

281 g=self._backend._bn_to_int(g[0]), 

282 q=q_val, 

283 ), 

284 y=self._backend._bn_to_int(pub_key[0]), 

285 ) 

286 

287 def parameters(self) -> dh.DHParameters: 

288 return _dh_cdata_to_parameters(self._dh_cdata, self._backend) 

289 

290 def public_bytes( 

291 self, 

292 encoding: serialization.Encoding, 

293 format: serialization.PublicFormat, 

294 ) -> bytes: 

295 if format is not serialization.PublicFormat.SubjectPublicKeyInfo: 

296 raise ValueError( 

297 "DH public keys support only " 

298 "SubjectPublicKeyInfo serialization" 

299 ) 

300 

301 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX: 

302 q = self._backend._ffi.new("BIGNUM **") 

303 self._backend._lib.DH_get0_pqg( 

304 self._dh_cdata, 

305 self._backend._ffi.NULL, 

306 q, 

307 self._backend._ffi.NULL, 

308 ) 

309 if q[0] != self._backend._ffi.NULL: 

310 raise UnsupportedAlgorithm( 

311 "DH X9.42 serialization is not supported", 

312 _Reasons.UNSUPPORTED_SERIALIZATION, 

313 ) 

314 

315 return self._backend._public_key_bytes( 

316 encoding, format, self, self._evp_pkey, None 

317 )