Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 72%

195 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 07:26 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import typing 

8 

9from cryptography.exceptions import InvalidTag 

10 

11if typing.TYPE_CHECKING: 

12 from cryptography.hazmat.backends.openssl.backend import Backend 

13 from cryptography.hazmat.primitives.ciphers.aead import ( 

14 AESCCM, 

15 AESGCM, 

16 ChaCha20Poly1305, 

17 ) 

18 

19 _AEADTypes = typing.Union[AESCCM, AESGCM, ChaCha20Poly1305] 

20 

21 

22def _is_evp_aead_supported_cipher( 

23 backend: Backend, cipher: _AEADTypes 

24) -> bool: 

25 """ 

26 Checks whether the given cipher is supported through 

27 EVP_AEAD rather than the normal OpenSSL EVP_CIPHER API. 

28 """ 

29 from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 

30 

31 return backend._lib.Cryptography_HAS_EVP_AEAD and isinstance( 

32 cipher, ChaCha20Poly1305 

33 ) 

34 

35 

36def _aead_cipher_supported(backend: Backend, cipher: _AEADTypes) -> bool: 

37 if _is_evp_aead_supported_cipher(backend, cipher): 

38 return True 

39 else: 

40 cipher_name = _evp_cipher_cipher_name(cipher) 

41 if backend._fips_enabled and cipher_name not in backend._fips_aead: 

42 return False 

43 return ( 

44 backend._lib.EVP_get_cipherbyname(cipher_name) != backend._ffi.NULL 

45 ) 

46 

47 

48def _aead_create_ctx( 

49 backend: Backend, 

50 cipher: _AEADTypes, 

51 key: bytes, 

52): 

53 if _is_evp_aead_supported_cipher(backend, cipher): 

54 return _evp_aead_create_ctx(backend, cipher, key) 

55 else: 

56 return _evp_cipher_create_ctx(backend, cipher, key) 

57 

58 

59def _encrypt( 

60 backend: Backend, 

61 cipher: _AEADTypes, 

62 nonce: bytes, 

63 data: bytes, 

64 associated_data: list[bytes], 

65 tag_length: int, 

66 ctx: typing.Any = None, 

67) -> bytes: 

68 if _is_evp_aead_supported_cipher(backend, cipher): 

69 return _evp_aead_encrypt( 

70 backend, cipher, nonce, data, associated_data, tag_length, ctx 

71 ) 

72 else: 

73 return _evp_cipher_encrypt( 

74 backend, cipher, nonce, data, associated_data, tag_length, ctx 

75 ) 

76 

77 

78def _decrypt( 

79 backend: Backend, 

80 cipher: _AEADTypes, 

81 nonce: bytes, 

82 data: bytes, 

83 associated_data: list[bytes], 

84 tag_length: int, 

85 ctx: typing.Any = None, 

86) -> bytes: 

87 if _is_evp_aead_supported_cipher(backend, cipher): 

88 return _evp_aead_decrypt( 

89 backend, cipher, nonce, data, associated_data, tag_length, ctx 

90 ) 

91 else: 

92 return _evp_cipher_decrypt( 

93 backend, cipher, nonce, data, associated_data, tag_length, ctx 

94 ) 

95 

96 

97def _evp_aead_create_ctx( 

98 backend: Backend, 

99 cipher: _AEADTypes, 

100 key: bytes, 

101 tag_len: int | None = None, 

102): 

103 aead_cipher = _evp_aead_get_cipher(backend, cipher) 

104 assert aead_cipher is not None 

105 key_ptr = backend._ffi.from_buffer(key) 

106 tag_len = ( 

107 backend._lib.EVP_AEAD_DEFAULT_TAG_LENGTH 

108 if tag_len is None 

109 else tag_len 

110 ) 

111 ctx = backend._lib.Cryptography_EVP_AEAD_CTX_new( 

112 aead_cipher, key_ptr, len(key), tag_len 

113 ) 

114 backend.openssl_assert(ctx != backend._ffi.NULL) 

115 ctx = backend._ffi.gc(ctx, backend._lib.EVP_AEAD_CTX_free) 

116 return ctx 

117 

118 

119def _evp_aead_get_cipher(backend: Backend, cipher: _AEADTypes): 

120 from cryptography.hazmat.primitives.ciphers.aead import ( 

121 ChaCha20Poly1305, 

122 ) 

123 

124 # Currently only ChaCha20-Poly1305 is supported using this API 

125 assert isinstance(cipher, ChaCha20Poly1305) 

126 return backend._lib.EVP_aead_chacha20_poly1305() 

127 

128 

129def _evp_aead_encrypt( 

130 backend: Backend, 

131 cipher: _AEADTypes, 

132 nonce: bytes, 

133 data: bytes, 

134 associated_data: list[bytes], 

135 tag_length: int, 

136 ctx: typing.Any, 

137) -> bytes: 

138 assert ctx is not None 

139 

140 aead_cipher = _evp_aead_get_cipher(backend, cipher) 

141 assert aead_cipher is not None 

142 

143 out_len = backend._ffi.new("size_t *") 

144 # max_out_len should be in_len plus the result of 

145 # EVP_AEAD_max_overhead. 

146 max_out_len = len(data) + backend._lib.EVP_AEAD_max_overhead(aead_cipher) 

147 out_buf = backend._ffi.new("uint8_t[]", max_out_len) 

148 data_ptr = backend._ffi.from_buffer(data) 

149 nonce_ptr = backend._ffi.from_buffer(nonce) 

150 aad = b"".join(associated_data) 

151 aad_ptr = backend._ffi.from_buffer(aad) 

152 

153 res = backend._lib.EVP_AEAD_CTX_seal( 

154 ctx, 

155 out_buf, 

156 out_len, 

157 max_out_len, 

158 nonce_ptr, 

159 len(nonce), 

160 data_ptr, 

161 len(data), 

162 aad_ptr, 

163 len(aad), 

164 ) 

165 backend.openssl_assert(res == 1) 

166 encrypted_data = backend._ffi.buffer(out_buf, out_len[0])[:] 

167 return encrypted_data 

168 

169 

170def _evp_aead_decrypt( 

171 backend: Backend, 

172 cipher: _AEADTypes, 

173 nonce: bytes, 

174 data: bytes, 

175 associated_data: list[bytes], 

176 tag_length: int, 

177 ctx: typing.Any, 

178) -> bytes: 

179 if len(data) < tag_length: 

180 raise InvalidTag 

181 

182 assert ctx is not None 

183 

184 out_len = backend._ffi.new("size_t *") 

185 # max_out_len should at least in_len 

186 max_out_len = len(data) 

187 out_buf = backend._ffi.new("uint8_t[]", max_out_len) 

188 data_ptr = backend._ffi.from_buffer(data) 

189 nonce_ptr = backend._ffi.from_buffer(nonce) 

190 aad = b"".join(associated_data) 

191 aad_ptr = backend._ffi.from_buffer(aad) 

192 

193 res = backend._lib.EVP_AEAD_CTX_open( 

194 ctx, 

195 out_buf, 

196 out_len, 

197 max_out_len, 

198 nonce_ptr, 

199 len(nonce), 

200 data_ptr, 

201 len(data), 

202 aad_ptr, 

203 len(aad), 

204 ) 

205 

206 if res == 0: 

207 backend._consume_errors() 

208 raise InvalidTag 

209 

210 decrypted_data = backend._ffi.buffer(out_buf, out_len[0])[:] 

211 return decrypted_data 

212 

213 

214_ENCRYPT = 1 

215_DECRYPT = 0 

216 

217 

218def _evp_cipher_cipher_name(cipher: _AEADTypes) -> bytes: 

219 from cryptography.hazmat.primitives.ciphers.aead import ( 

220 AESCCM, 

221 AESGCM, 

222 ChaCha20Poly1305, 

223 ) 

224 

225 if isinstance(cipher, ChaCha20Poly1305): 

226 return b"chacha20-poly1305" 

227 elif isinstance(cipher, AESCCM): 

228 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii") 

229 else: 

230 assert isinstance(cipher, AESGCM) 

231 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii") 

232 

233 

234def _evp_cipher(cipher_name: bytes, backend: Backend): 

235 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name) 

236 backend.openssl_assert(evp_cipher != backend._ffi.NULL) 

237 return evp_cipher 

238 

239 

240def _evp_cipher_create_ctx( 

241 backend: Backend, 

242 cipher: _AEADTypes, 

243 key: bytes, 

244): 

245 ctx = backend._lib.EVP_CIPHER_CTX_new() 

246 backend.openssl_assert(ctx != backend._ffi.NULL) 

247 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) 

248 cipher_name = _evp_cipher_cipher_name(cipher) 

249 evp_cipher = _evp_cipher(cipher_name, backend) 

250 key_ptr = backend._ffi.from_buffer(key) 

251 res = backend._lib.EVP_CipherInit_ex( 

252 ctx, 

253 evp_cipher, 

254 backend._ffi.NULL, 

255 key_ptr, 

256 backend._ffi.NULL, 

257 0, 

258 ) 

259 backend.openssl_assert(res != 0) 

260 return ctx 

261 

262 

263def _evp_cipher_aead_setup( 

264 backend: Backend, 

265 cipher_name: bytes, 

266 key: bytes, 

267 nonce: bytes, 

268 tag: bytes | None, 

269 tag_len: int, 

270 operation: int, 

271): 

272 evp_cipher = _evp_cipher(cipher_name, backend) 

273 ctx = backend._lib.EVP_CIPHER_CTX_new() 

274 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) 

275 res = backend._lib.EVP_CipherInit_ex( 

276 ctx, 

277 evp_cipher, 

278 backend._ffi.NULL, 

279 backend._ffi.NULL, 

280 backend._ffi.NULL, 

281 int(operation == _ENCRYPT), 

282 ) 

283 backend.openssl_assert(res != 0) 

284 # CCM requires the IVLEN to be set before calling SET_TAG on decrypt 

285 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

286 ctx, 

287 backend._lib.EVP_CTRL_AEAD_SET_IVLEN, 

288 len(nonce), 

289 backend._ffi.NULL, 

290 ) 

291 backend.openssl_assert(res != 0) 

292 if operation == _DECRYPT: 

293 assert tag is not None 

294 _evp_cipher_set_tag(backend, ctx, tag) 

295 elif cipher_name.endswith(b"-ccm"): 

296 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

297 ctx, 

298 backend._lib.EVP_CTRL_AEAD_SET_TAG, 

299 tag_len, 

300 backend._ffi.NULL, 

301 ) 

302 backend.openssl_assert(res != 0) 

303 

304 nonce_ptr = backend._ffi.from_buffer(nonce) 

305 key_ptr = backend._ffi.from_buffer(key) 

306 res = backend._lib.EVP_CipherInit_ex( 

307 ctx, 

308 backend._ffi.NULL, 

309 backend._ffi.NULL, 

310 key_ptr, 

311 nonce_ptr, 

312 int(operation == _ENCRYPT), 

313 ) 

314 backend.openssl_assert(res != 0) 

315 return ctx 

316 

317 

318def _evp_cipher_set_tag(backend, ctx, tag: bytes) -> None: 

319 tag_ptr = backend._ffi.from_buffer(tag) 

320 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

321 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag_ptr 

322 ) 

323 backend.openssl_assert(res != 0) 

324 

325 

326def _evp_cipher_set_nonce_operation( 

327 backend, ctx, nonce: bytes, operation: int 

328) -> None: 

329 nonce_ptr = backend._ffi.from_buffer(nonce) 

330 res = backend._lib.EVP_CipherInit_ex( 

331 ctx, 

332 backend._ffi.NULL, 

333 backend._ffi.NULL, 

334 backend._ffi.NULL, 

335 nonce_ptr, 

336 int(operation == _ENCRYPT), 

337 ) 

338 backend.openssl_assert(res != 0) 

339 

340 

341def _evp_cipher_set_length(backend: Backend, ctx, data_len: int) -> None: 

342 intptr = backend._ffi.new("int *") 

343 res = backend._lib.EVP_CipherUpdate( 

344 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len 

345 ) 

346 backend.openssl_assert(res != 0) 

347 

348 

349def _evp_cipher_process_aad( 

350 backend: Backend, ctx, associated_data: bytes 

351) -> None: 

352 outlen = backend._ffi.new("int *") 

353 a_data_ptr = backend._ffi.from_buffer(associated_data) 

354 res = backend._lib.EVP_CipherUpdate( 

355 ctx, backend._ffi.NULL, outlen, a_data_ptr, len(associated_data) 

356 ) 

357 backend.openssl_assert(res != 0) 

358 

359 

360def _evp_cipher_process_data(backend: Backend, ctx, data: bytes) -> bytes: 

361 outlen = backend._ffi.new("int *") 

362 buf = backend._ffi.new("unsigned char[]", len(data)) 

363 data_ptr = backend._ffi.from_buffer(data) 

364 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data_ptr, len(data)) 

365 backend.openssl_assert(res != 0) 

366 return backend._ffi.buffer(buf, outlen[0])[:] 

367 

368 

369def _evp_cipher_encrypt( 

370 backend: Backend, 

371 cipher: _AEADTypes, 

372 nonce: bytes, 

373 data: bytes, 

374 associated_data: list[bytes], 

375 tag_length: int, 

376 ctx: typing.Any = None, 

377) -> bytes: 

378 from cryptography.hazmat.primitives.ciphers.aead import AESCCM 

379 

380 if ctx is None: 

381 cipher_name = _evp_cipher_cipher_name(cipher) 

382 ctx = _evp_cipher_aead_setup( 

383 backend, 

384 cipher_name, 

385 cipher._key, 

386 nonce, 

387 None, 

388 tag_length, 

389 _ENCRYPT, 

390 ) 

391 else: 

392 _evp_cipher_set_nonce_operation(backend, ctx, nonce, _ENCRYPT) 

393 

394 # CCM requires us to pass the length of the data before processing 

395 # anything. 

396 # However calling this with any other AEAD results in an error 

397 if isinstance(cipher, AESCCM): 

398 _evp_cipher_set_length(backend, ctx, len(data)) 

399 

400 for ad in associated_data: 

401 _evp_cipher_process_aad(backend, ctx, ad) 

402 processed_data = _evp_cipher_process_data(backend, ctx, data) 

403 outlen = backend._ffi.new("int *") 

404 # All AEADs we support besides OCB are streaming so they return nothing 

405 # in finalization. OCB can return up to (16 byte block - 1) bytes so 

406 # we need a buffer here too. 

407 buf = backend._ffi.new("unsigned char[]", 16) 

408 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

409 backend.openssl_assert(res != 0) 

410 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

411 tag_buf = backend._ffi.new("unsigned char[]", tag_length) 

412 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

413 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf 

414 ) 

415 backend.openssl_assert(res != 0) 

416 tag = backend._ffi.buffer(tag_buf)[:] 

417 

418 return processed_data + tag 

419 

420 

421def _evp_cipher_decrypt( 

422 backend: Backend, 

423 cipher: _AEADTypes, 

424 nonce: bytes, 

425 data: bytes, 

426 associated_data: list[bytes], 

427 tag_length: int, 

428 ctx: typing.Any = None, 

429) -> bytes: 

430 from cryptography.hazmat.primitives.ciphers.aead import AESCCM 

431 

432 if len(data) < tag_length: 

433 raise InvalidTag 

434 

435 tag = data[-tag_length:] 

436 data = data[:-tag_length] 

437 if ctx is None: 

438 cipher_name = _evp_cipher_cipher_name(cipher) 

439 ctx = _evp_cipher_aead_setup( 

440 backend, 

441 cipher_name, 

442 cipher._key, 

443 nonce, 

444 tag, 

445 tag_length, 

446 _DECRYPT, 

447 ) 

448 else: 

449 _evp_cipher_set_nonce_operation(backend, ctx, nonce, _DECRYPT) 

450 _evp_cipher_set_tag(backend, ctx, tag) 

451 

452 # CCM requires us to pass the length of the data before processing 

453 # anything. 

454 # However calling this with any other AEAD results in an error 

455 if isinstance(cipher, AESCCM): 

456 _evp_cipher_set_length(backend, ctx, len(data)) 

457 

458 for ad in associated_data: 

459 _evp_cipher_process_aad(backend, ctx, ad) 

460 # CCM has a different error path if the tag doesn't match. Errors are 

461 # raised in Update and Final is irrelevant. 

462 if isinstance(cipher, AESCCM): 

463 outlen = backend._ffi.new("int *") 

464 buf = backend._ffi.new("unsigned char[]", len(data)) 

465 d_ptr = backend._ffi.from_buffer(data) 

466 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, d_ptr, len(data)) 

467 if res != 1: 

468 backend._consume_errors() 

469 raise InvalidTag 

470 

471 processed_data = backend._ffi.buffer(buf, outlen[0])[:] 

472 else: 

473 processed_data = _evp_cipher_process_data(backend, ctx, data) 

474 outlen = backend._ffi.new("int *") 

475 # OCB can return up to 15 bytes (16 byte block - 1) in finalization 

476 buf = backend._ffi.new("unsigned char[]", 16) 

477 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

478 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

479 if res == 0: 

480 backend._consume_errors() 

481 raise InvalidTag 

482 

483 return processed_data