Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/dh.py: 17%

173 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5import typing 

6 

7from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 

8from cryptography.hazmat.primitives import serialization 

9from cryptography.hazmat.primitives.asymmetric import dh 

10 

11 

12if typing.TYPE_CHECKING: 

13 from cryptography.hazmat.backends.openssl.backend import Backend 

14 

15 

16def _dh_params_dup(dh_cdata, backend: "Backend"): 

17 lib = backend._lib 

18 ffi = backend._ffi 

19 

20 param_cdata = lib.DHparams_dup(dh_cdata) 

21 backend.openssl_assert(param_cdata != ffi.NULL) 

22 param_cdata = ffi.gc(param_cdata, lib.DH_free) 

23 if lib.CRYPTOGRAPHY_IS_LIBRESSL: 

24 # In libressl DHparams_dup don't copy q 

25 q = ffi.new("BIGNUM **") 

26 lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL) 

27 q_dup = lib.BN_dup(q[0]) 

28 res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL) 

29 backend.openssl_assert(res == 1) 

30 

31 return param_cdata 

32 

33 

34def _dh_cdata_to_parameters(dh_cdata, backend: "Backend") -> "_DHParameters": 

35 param_cdata = _dh_params_dup(dh_cdata, backend) 

36 return _DHParameters(backend, param_cdata) 

37 

38 

39class _DHParameters(dh.DHParameters): 

40 def __init__(self, backend: "Backend", dh_cdata): 

41 self._backend = backend 

42 self._dh_cdata = dh_cdata 

43 

44 def parameter_numbers(self) -> dh.DHParameterNumbers: 

45 p = self._backend._ffi.new("BIGNUM **") 

46 g = self._backend._ffi.new("BIGNUM **") 

47 q = self._backend._ffi.new("BIGNUM **") 

48 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 

49 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 

50 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 

51 q_val: typing.Optional[int] 

52 if q[0] == self._backend._ffi.NULL: 

53 q_val = None 

54 else: 

55 q_val = self._backend._bn_to_int(q[0]) 

56 return dh.DHParameterNumbers( 

57 p=self._backend._bn_to_int(p[0]), 

58 g=self._backend._bn_to_int(g[0]), 

59 q=q_val, 

60 ) 

61 

62 def generate_private_key(self) -> dh.DHPrivateKey: 

63 return self._backend.generate_dh_private_key(self) 

64 

65 def parameter_bytes( 

66 self, 

67 encoding: serialization.Encoding, 

68 format: serialization.ParameterFormat, 

69 ) -> bytes: 

70 if encoding is serialization.Encoding.OpenSSH: 

71 raise TypeError("OpenSSH encoding is not supported") 

72 

73 if format is not serialization.ParameterFormat.PKCS3: 

74 raise ValueError("Only PKCS3 serialization is supported") 

75 

76 q = self._backend._ffi.new("BIGNUM **") 

77 self._backend._lib.DH_get0_pqg( 

78 self._dh_cdata, self._backend._ffi.NULL, q, self._backend._ffi.NULL 

79 ) 

80 if ( 

81 q[0] != self._backend._ffi.NULL 

82 and not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX 

83 ): 

84 raise UnsupportedAlgorithm( 

85 "DH X9.42 serialization is not supported", 

86 _Reasons.UNSUPPORTED_SERIALIZATION, 

87 ) 

88 

89 if encoding is serialization.Encoding.PEM: 

90 if q[0] != self._backend._ffi.NULL: 

91 write_bio = self._backend._lib.PEM_write_bio_DHxparams 

92 else: 

93 write_bio = self._backend._lib.PEM_write_bio_DHparams 

94 elif encoding is serialization.Encoding.DER: 

95 if q[0] != self._backend._ffi.NULL: 

96 write_bio = self._backend._lib.Cryptography_i2d_DHxparams_bio 

97 else: 

98 write_bio = self._backend._lib.i2d_DHparams_bio 

99 else: 

100 raise TypeError("encoding must be an item from the Encoding enum") 

101 

102 bio = self._backend._create_mem_bio_gc() 

103 res = write_bio(bio, self._dh_cdata) 

104 self._backend.openssl_assert(res == 1) 

105 return self._backend._read_mem_bio(bio) 

106 

107 

108def _get_dh_num_bits(backend, dh_cdata) -> int: 

109 p = backend._ffi.new("BIGNUM **") 

110 backend._lib.DH_get0_pqg(dh_cdata, p, backend._ffi.NULL, backend._ffi.NULL) 

111 backend.openssl_assert(p[0] != backend._ffi.NULL) 

112 return backend._lib.BN_num_bits(p[0]) 

113 

114 

115class _DHPrivateKey(dh.DHPrivateKey): 

116 def __init__(self, backend: "Backend", dh_cdata, evp_pkey): 

117 self._backend = backend 

118 self._dh_cdata = dh_cdata 

119 self._evp_pkey = evp_pkey 

120 self._key_size_bytes = self._backend._lib.DH_size(dh_cdata) 

121 

122 @property 

123 def key_size(self) -> int: 

124 return _get_dh_num_bits(self._backend, self._dh_cdata) 

125 

126 def private_numbers(self) -> dh.DHPrivateNumbers: 

127 p = self._backend._ffi.new("BIGNUM **") 

128 g = self._backend._ffi.new("BIGNUM **") 

129 q = self._backend._ffi.new("BIGNUM **") 

130 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 

131 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 

132 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 

133 if q[0] == self._backend._ffi.NULL: 

134 q_val = None 

135 else: 

136 q_val = self._backend._bn_to_int(q[0]) 

137 pub_key = self._backend._ffi.new("BIGNUM **") 

138 priv_key = self._backend._ffi.new("BIGNUM **") 

139 self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key) 

140 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 

141 self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL) 

142 return dh.DHPrivateNumbers( 

143 public_numbers=dh.DHPublicNumbers( 

144 parameter_numbers=dh.DHParameterNumbers( 

145 p=self._backend._bn_to_int(p[0]), 

146 g=self._backend._bn_to_int(g[0]), 

147 q=q_val, 

148 ), 

149 y=self._backend._bn_to_int(pub_key[0]), 

150 ), 

151 x=self._backend._bn_to_int(priv_key[0]), 

152 ) 

153 

154 def exchange(self, peer_public_key: dh.DHPublicKey) -> bytes: 

155 if not isinstance(peer_public_key, _DHPublicKey): 

156 raise TypeError("peer_public_key must be a DHPublicKey") 

157 

158 ctx = self._backend._lib.EVP_PKEY_CTX_new( 

159 self._evp_pkey, self._backend._ffi.NULL 

160 ) 

161 self._backend.openssl_assert(ctx != self._backend._ffi.NULL) 

162 ctx = self._backend._ffi.gc(ctx, self._backend._lib.EVP_PKEY_CTX_free) 

163 res = self._backend._lib.EVP_PKEY_derive_init(ctx) 

164 self._backend.openssl_assert(res == 1) 

165 res = self._backend._lib.EVP_PKEY_derive_set_peer( 

166 ctx, peer_public_key._evp_pkey 

167 ) 

168 # Invalid kex errors here in OpenSSL 3.0 because checks were moved 

169 # to EVP_PKEY_derive_set_peer 

170 self._exchange_assert(res == 1) 

171 keylen = self._backend._ffi.new("size_t *") 

172 res = self._backend._lib.EVP_PKEY_derive( 

173 ctx, self._backend._ffi.NULL, keylen 

174 ) 

175 # Invalid kex errors here in OpenSSL < 3 

176 self._exchange_assert(res == 1) 

177 self._backend.openssl_assert(keylen[0] > 0) 

178 buf = self._backend._ffi.new("unsigned char[]", keylen[0]) 

179 res = self._backend._lib.EVP_PKEY_derive(ctx, buf, keylen) 

180 self._backend.openssl_assert(res == 1) 

181 

182 key = self._backend._ffi.buffer(buf, keylen[0])[:] 

183 pad = self._key_size_bytes - len(key) 

184 

185 if pad > 0: 

186 key = (b"\x00" * pad) + key 

187 

188 return key 

189 

190 def _exchange_assert(self, ok: bool) -> None: 

191 if not ok: 

192 errors_with_text = self._backend._consume_errors_with_text() 

193 raise ValueError( 

194 "Error computing shared key.", 

195 errors_with_text, 

196 ) 

197 

198 def public_key(self) -> dh.DHPublicKey: 

199 dh_cdata = _dh_params_dup(self._dh_cdata, self._backend) 

200 pub_key = self._backend._ffi.new("BIGNUM **") 

201 self._backend._lib.DH_get0_key( 

202 self._dh_cdata, pub_key, self._backend._ffi.NULL 

203 ) 

204 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 

205 pub_key_dup = self._backend._lib.BN_dup(pub_key[0]) 

206 self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL) 

207 

208 res = self._backend._lib.DH_set0_key( 

209 dh_cdata, pub_key_dup, self._backend._ffi.NULL 

210 ) 

211 self._backend.openssl_assert(res == 1) 

212 evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata) 

213 return _DHPublicKey(self._backend, dh_cdata, evp_pkey) 

214 

215 def parameters(self) -> dh.DHParameters: 

216 return _dh_cdata_to_parameters(self._dh_cdata, self._backend) 

217 

218 def private_bytes( 

219 self, 

220 encoding: serialization.Encoding, 

221 format: serialization.PrivateFormat, 

222 encryption_algorithm: serialization.KeySerializationEncryption, 

223 ) -> bytes: 

224 if format is not serialization.PrivateFormat.PKCS8: 

225 raise ValueError( 

226 "DH private keys support only PKCS8 serialization" 

227 ) 

228 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX: 

229 q = self._backend._ffi.new("BIGNUM **") 

230 self._backend._lib.DH_get0_pqg( 

231 self._dh_cdata, 

232 self._backend._ffi.NULL, 

233 q, 

234 self._backend._ffi.NULL, 

235 ) 

236 if q[0] != self._backend._ffi.NULL: 

237 raise UnsupportedAlgorithm( 

238 "DH X9.42 serialization is not supported", 

239 _Reasons.UNSUPPORTED_SERIALIZATION, 

240 ) 

241 

242 return self._backend._private_key_bytes( 

243 encoding, 

244 format, 

245 encryption_algorithm, 

246 self, 

247 self._evp_pkey, 

248 self._dh_cdata, 

249 ) 

250 

251 

252class _DHPublicKey(dh.DHPublicKey): 

253 def __init__(self, backend: "Backend", dh_cdata, evp_pkey): 

254 self._backend = backend 

255 self._dh_cdata = dh_cdata 

256 self._evp_pkey = evp_pkey 

257 self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata) 

258 

259 @property 

260 def key_size(self) -> int: 

261 return self._key_size_bits 

262 

263 def public_numbers(self) -> dh.DHPublicNumbers: 

264 p = self._backend._ffi.new("BIGNUM **") 

265 g = self._backend._ffi.new("BIGNUM **") 

266 q = self._backend._ffi.new("BIGNUM **") 

267 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 

268 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 

269 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 

270 if q[0] == self._backend._ffi.NULL: 

271 q_val = None 

272 else: 

273 q_val = self._backend._bn_to_int(q[0]) 

274 pub_key = self._backend._ffi.new("BIGNUM **") 

275 self._backend._lib.DH_get0_key( 

276 self._dh_cdata, pub_key, self._backend._ffi.NULL 

277 ) 

278 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 

279 return dh.DHPublicNumbers( 

280 parameter_numbers=dh.DHParameterNumbers( 

281 p=self._backend._bn_to_int(p[0]), 

282 g=self._backend._bn_to_int(g[0]), 

283 q=q_val, 

284 ), 

285 y=self._backend._bn_to_int(pub_key[0]), 

286 ) 

287 

288 def parameters(self) -> dh.DHParameters: 

289 return _dh_cdata_to_parameters(self._dh_cdata, self._backend) 

290 

291 def public_bytes( 

292 self, 

293 encoding: serialization.Encoding, 

294 format: serialization.PublicFormat, 

295 ) -> bytes: 

296 if format is not serialization.PublicFormat.SubjectPublicKeyInfo: 

297 raise ValueError( 

298 "DH public keys support only " 

299 "SubjectPublicKeyInfo serialization" 

300 ) 

301 

302 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX: 

303 q = self._backend._ffi.new("BIGNUM **") 

304 self._backend._lib.DH_get0_pqg( 

305 self._dh_cdata, 

306 self._backend._ffi.NULL, 

307 q, 

308 self._backend._ffi.NULL, 

309 ) 

310 if q[0] != self._backend._ffi.NULL: 

311 raise UnsupportedAlgorithm( 

312 "DH X9.42 serialization is not supported", 

313 _Reasons.UNSUPPORTED_SERIALIZATION, 

314 ) 

315 

316 return self._backend._public_key_bytes( 

317 encoding, format, self, self._evp_pkey, None 

318 )