Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 12%

213 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 06:13 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import typing 

8 

9from cryptography.exceptions import InvalidTag 

10 

11if typing.TYPE_CHECKING: 

12 from cryptography.hazmat.backends.openssl.backend import Backend 

13 from cryptography.hazmat.primitives.ciphers.aead import ( 

14 AESCCM, 

15 AESGCM, 

16 AESOCB3, 

17 AESSIV, 

18 ChaCha20Poly1305, 

19 ) 

20 

21 _AEADTypes = typing.Union[ 

22 AESCCM, AESGCM, AESOCB3, AESSIV, ChaCha20Poly1305 

23 ] 

24 

25 

26def _is_evp_aead_supported_cipher( 

27 backend: Backend, cipher: _AEADTypes 

28) -> bool: 

29 """ 

30 Checks whether the given cipher is supported through 

31 EVP_AEAD rather than the normal OpenSSL EVP_CIPHER API. 

32 """ 

33 from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 

34 

35 return backend._lib.Cryptography_HAS_EVP_AEAD and isinstance( 

36 cipher, ChaCha20Poly1305 

37 ) 

38 

39 

40def _aead_cipher_supported(backend: Backend, cipher: _AEADTypes) -> bool: 

41 if _is_evp_aead_supported_cipher(backend, cipher): 

42 return True 

43 else: 

44 cipher_name = _evp_cipher_cipher_name(cipher) 

45 if backend._fips_enabled and cipher_name not in backend._fips_aead: 

46 return False 

47 # SIV isn't loaded through get_cipherbyname but instead a new fetch API 

48 # only available in 3.0+. But if we know we're on 3.0+ then we know 

49 # it's supported. 

50 if cipher_name.endswith(b"-siv"): 

51 return backend._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1 

52 else: 

53 return ( 

54 backend._lib.EVP_get_cipherbyname(cipher_name) 

55 != backend._ffi.NULL 

56 ) 

57 

58 

59def _aead_create_ctx( 

60 backend: Backend, 

61 cipher: _AEADTypes, 

62 key: bytes, 

63): 

64 if _is_evp_aead_supported_cipher(backend, cipher): 

65 return _evp_aead_create_ctx(backend, cipher, key) 

66 else: 

67 return _evp_cipher_create_ctx(backend, cipher, key) 

68 

69 

70def _encrypt( 

71 backend: Backend, 

72 cipher: _AEADTypes, 

73 nonce: bytes, 

74 data: bytes, 

75 associated_data: typing.List[bytes], 

76 tag_length: int, 

77 ctx: typing.Any = None, 

78) -> bytes: 

79 if _is_evp_aead_supported_cipher(backend, cipher): 

80 return _evp_aead_encrypt( 

81 backend, cipher, nonce, data, associated_data, tag_length, ctx 

82 ) 

83 else: 

84 return _evp_cipher_encrypt( 

85 backend, cipher, nonce, data, associated_data, tag_length, ctx 

86 ) 

87 

88 

89def _decrypt( 

90 backend: Backend, 

91 cipher: _AEADTypes, 

92 nonce: bytes, 

93 data: bytes, 

94 associated_data: typing.List[bytes], 

95 tag_length: int, 

96 ctx: typing.Any = None, 

97) -> bytes: 

98 if _is_evp_aead_supported_cipher(backend, cipher): 

99 return _evp_aead_decrypt( 

100 backend, cipher, nonce, data, associated_data, tag_length, ctx 

101 ) 

102 else: 

103 return _evp_cipher_decrypt( 

104 backend, cipher, nonce, data, associated_data, tag_length, ctx 

105 ) 

106 

107 

108def _evp_aead_create_ctx( 

109 backend: Backend, 

110 cipher: _AEADTypes, 

111 key: bytes, 

112 tag_len: typing.Optional[int] = None, 

113): 

114 aead_cipher = _evp_aead_get_cipher(backend, cipher) 

115 assert aead_cipher is not None 

116 key_ptr = backend._ffi.from_buffer(key) 

117 tag_len = ( 

118 backend._lib.EVP_AEAD_DEFAULT_TAG_LENGTH 

119 if tag_len is None 

120 else tag_len 

121 ) 

122 ctx = backend._lib.Cryptography_EVP_AEAD_CTX_new( 

123 aead_cipher, key_ptr, len(key), tag_len 

124 ) 

125 backend.openssl_assert(ctx != backend._ffi.NULL) 

126 ctx = backend._ffi.gc(ctx, backend._lib.EVP_AEAD_CTX_free) 

127 return ctx 

128 

129 

130def _evp_aead_get_cipher(backend: Backend, cipher: _AEADTypes): 

131 from cryptography.hazmat.primitives.ciphers.aead import ( 

132 ChaCha20Poly1305, 

133 ) 

134 

135 # Currently only ChaCha20-Poly1305 is supported using this API 

136 assert isinstance(cipher, ChaCha20Poly1305) 

137 return backend._lib.EVP_aead_chacha20_poly1305() 

138 

139 

140def _evp_aead_encrypt( 

141 backend: Backend, 

142 cipher: _AEADTypes, 

143 nonce: bytes, 

144 data: bytes, 

145 associated_data: typing.List[bytes], 

146 tag_length: int, 

147 ctx: typing.Any, 

148) -> bytes: 

149 assert ctx is not None 

150 

151 aead_cipher = _evp_aead_get_cipher(backend, cipher) 

152 assert aead_cipher is not None 

153 

154 out_len = backend._ffi.new("size_t *") 

155 # max_out_len should be in_len plus the result of 

156 # EVP_AEAD_max_overhead. 

157 max_out_len = len(data) + backend._lib.EVP_AEAD_max_overhead(aead_cipher) 

158 out_buf = backend._ffi.new("uint8_t[]", max_out_len) 

159 data_ptr = backend._ffi.from_buffer(data) 

160 nonce_ptr = backend._ffi.from_buffer(nonce) 

161 aad = b"".join(associated_data) 

162 aad_ptr = backend._ffi.from_buffer(aad) 

163 

164 res = backend._lib.EVP_AEAD_CTX_seal( 

165 ctx, 

166 out_buf, 

167 out_len, 

168 max_out_len, 

169 nonce_ptr, 

170 len(nonce), 

171 data_ptr, 

172 len(data), 

173 aad_ptr, 

174 len(aad), 

175 ) 

176 backend.openssl_assert(res == 1) 

177 encrypted_data = backend._ffi.buffer(out_buf, out_len[0])[:] 

178 return encrypted_data 

179 

180 

181def _evp_aead_decrypt( 

182 backend: Backend, 

183 cipher: _AEADTypes, 

184 nonce: bytes, 

185 data: bytes, 

186 associated_data: typing.List[bytes], 

187 tag_length: int, 

188 ctx: typing.Any, 

189) -> bytes: 

190 if len(data) < tag_length: 

191 raise InvalidTag 

192 

193 assert ctx is not None 

194 

195 out_len = backend._ffi.new("size_t *") 

196 # max_out_len should at least in_len 

197 max_out_len = len(data) 

198 out_buf = backend._ffi.new("uint8_t[]", max_out_len) 

199 data_ptr = backend._ffi.from_buffer(data) 

200 nonce_ptr = backend._ffi.from_buffer(nonce) 

201 aad = b"".join(associated_data) 

202 aad_ptr = backend._ffi.from_buffer(aad) 

203 

204 res = backend._lib.EVP_AEAD_CTX_open( 

205 ctx, 

206 out_buf, 

207 out_len, 

208 max_out_len, 

209 nonce_ptr, 

210 len(nonce), 

211 data_ptr, 

212 len(data), 

213 aad_ptr, 

214 len(aad), 

215 ) 

216 

217 if res == 0: 

218 backend._consume_errors() 

219 raise InvalidTag 

220 

221 decrypted_data = backend._ffi.buffer(out_buf, out_len[0])[:] 

222 return decrypted_data 

223 

224 

225_ENCRYPT = 1 

226_DECRYPT = 0 

227 

228 

229def _evp_cipher_cipher_name(cipher: _AEADTypes) -> bytes: 

230 from cryptography.hazmat.primitives.ciphers.aead import ( 

231 AESCCM, 

232 AESGCM, 

233 AESOCB3, 

234 AESSIV, 

235 ChaCha20Poly1305, 

236 ) 

237 

238 if isinstance(cipher, ChaCha20Poly1305): 

239 return b"chacha20-poly1305" 

240 elif isinstance(cipher, AESCCM): 

241 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii") 

242 elif isinstance(cipher, AESOCB3): 

243 return f"aes-{len(cipher._key) * 8}-ocb".encode("ascii") 

244 elif isinstance(cipher, AESSIV): 

245 return f"aes-{len(cipher._key) * 8 // 2}-siv".encode("ascii") 

246 else: 

247 assert isinstance(cipher, AESGCM) 

248 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii") 

249 

250 

251def _evp_cipher(cipher_name: bytes, backend: Backend): 

252 if cipher_name.endswith(b"-siv"): 

253 evp_cipher = backend._lib.EVP_CIPHER_fetch( 

254 backend._ffi.NULL, 

255 cipher_name, 

256 backend._ffi.NULL, 

257 ) 

258 backend.openssl_assert(evp_cipher != backend._ffi.NULL) 

259 evp_cipher = backend._ffi.gc(evp_cipher, backend._lib.EVP_CIPHER_free) 

260 else: 

261 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name) 

262 backend.openssl_assert(evp_cipher != backend._ffi.NULL) 

263 

264 return evp_cipher 

265 

266 

267def _evp_cipher_create_ctx( 

268 backend: Backend, 

269 cipher: _AEADTypes, 

270 key: bytes, 

271): 

272 ctx = backend._lib.EVP_CIPHER_CTX_new() 

273 backend.openssl_assert(ctx != backend._ffi.NULL) 

274 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) 

275 cipher_name = _evp_cipher_cipher_name(cipher) 

276 evp_cipher = _evp_cipher(cipher_name, backend) 

277 key_ptr = backend._ffi.from_buffer(key) 

278 res = backend._lib.EVP_CipherInit_ex( 

279 ctx, 

280 evp_cipher, 

281 backend._ffi.NULL, 

282 key_ptr, 

283 backend._ffi.NULL, 

284 0, 

285 ) 

286 backend.openssl_assert(res != 0) 

287 return ctx 

288 

289 

290def _evp_cipher_aead_setup( 

291 backend: Backend, 

292 cipher_name: bytes, 

293 key: bytes, 

294 nonce: bytes, 

295 tag: typing.Optional[bytes], 

296 tag_len: int, 

297 operation: int, 

298): 

299 evp_cipher = _evp_cipher(cipher_name, backend) 

300 ctx = backend._lib.EVP_CIPHER_CTX_new() 

301 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free) 

302 res = backend._lib.EVP_CipherInit_ex( 

303 ctx, 

304 evp_cipher, 

305 backend._ffi.NULL, 

306 backend._ffi.NULL, 

307 backend._ffi.NULL, 

308 int(operation == _ENCRYPT), 

309 ) 

310 backend.openssl_assert(res != 0) 

311 # CCM requires the IVLEN to be set before calling SET_TAG on decrypt 

312 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

313 ctx, 

314 backend._lib.EVP_CTRL_AEAD_SET_IVLEN, 

315 len(nonce), 

316 backend._ffi.NULL, 

317 ) 

318 backend.openssl_assert(res != 0) 

319 if operation == _DECRYPT: 

320 assert tag is not None 

321 _evp_cipher_set_tag(backend, ctx, tag) 

322 elif cipher_name.endswith(b"-ccm"): 

323 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

324 ctx, 

325 backend._lib.EVP_CTRL_AEAD_SET_TAG, 

326 tag_len, 

327 backend._ffi.NULL, 

328 ) 

329 backend.openssl_assert(res != 0) 

330 

331 nonce_ptr = backend._ffi.from_buffer(nonce) 

332 key_ptr = backend._ffi.from_buffer(key) 

333 res = backend._lib.EVP_CipherInit_ex( 

334 ctx, 

335 backend._ffi.NULL, 

336 backend._ffi.NULL, 

337 key_ptr, 

338 nonce_ptr, 

339 int(operation == _ENCRYPT), 

340 ) 

341 backend.openssl_assert(res != 0) 

342 return ctx 

343 

344 

345def _evp_cipher_set_tag(backend, ctx, tag: bytes) -> None: 

346 tag_ptr = backend._ffi.from_buffer(tag) 

347 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

348 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag_ptr 

349 ) 

350 backend.openssl_assert(res != 0) 

351 

352 

353def _evp_cipher_set_nonce_operation( 

354 backend, ctx, nonce: bytes, operation: int 

355) -> None: 

356 nonce_ptr = backend._ffi.from_buffer(nonce) 

357 res = backend._lib.EVP_CipherInit_ex( 

358 ctx, 

359 backend._ffi.NULL, 

360 backend._ffi.NULL, 

361 backend._ffi.NULL, 

362 nonce_ptr, 

363 int(operation == _ENCRYPT), 

364 ) 

365 backend.openssl_assert(res != 0) 

366 

367 

368def _evp_cipher_set_length(backend: Backend, ctx, data_len: int) -> None: 

369 intptr = backend._ffi.new("int *") 

370 res = backend._lib.EVP_CipherUpdate( 

371 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len 

372 ) 

373 backend.openssl_assert(res != 0) 

374 

375 

376def _evp_cipher_process_aad( 

377 backend: Backend, ctx, associated_data: bytes 

378) -> None: 

379 outlen = backend._ffi.new("int *") 

380 a_data_ptr = backend._ffi.from_buffer(associated_data) 

381 res = backend._lib.EVP_CipherUpdate( 

382 ctx, backend._ffi.NULL, outlen, a_data_ptr, len(associated_data) 

383 ) 

384 backend.openssl_assert(res != 0) 

385 

386 

387def _evp_cipher_process_data(backend: Backend, ctx, data: bytes) -> bytes: 

388 outlen = backend._ffi.new("int *") 

389 buf = backend._ffi.new("unsigned char[]", len(data)) 

390 data_ptr = backend._ffi.from_buffer(data) 

391 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data_ptr, len(data)) 

392 if res == 0: 

393 # AES SIV can error here if the data is invalid on decrypt 

394 backend._consume_errors() 

395 raise InvalidTag 

396 return backend._ffi.buffer(buf, outlen[0])[:] 

397 

398 

399def _evp_cipher_encrypt( 

400 backend: Backend, 

401 cipher: _AEADTypes, 

402 nonce: bytes, 

403 data: bytes, 

404 associated_data: typing.List[bytes], 

405 tag_length: int, 

406 ctx: typing.Any = None, 

407) -> bytes: 

408 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV 

409 

410 if ctx is None: 

411 cipher_name = _evp_cipher_cipher_name(cipher) 

412 ctx = _evp_cipher_aead_setup( 

413 backend, 

414 cipher_name, 

415 cipher._key, 

416 nonce, 

417 None, 

418 tag_length, 

419 _ENCRYPT, 

420 ) 

421 else: 

422 _evp_cipher_set_nonce_operation(backend, ctx, nonce, _ENCRYPT) 

423 

424 # CCM requires us to pass the length of the data before processing 

425 # anything. 

426 # However calling this with any other AEAD results in an error 

427 if isinstance(cipher, AESCCM): 

428 _evp_cipher_set_length(backend, ctx, len(data)) 

429 

430 for ad in associated_data: 

431 _evp_cipher_process_aad(backend, ctx, ad) 

432 processed_data = _evp_cipher_process_data(backend, ctx, data) 

433 outlen = backend._ffi.new("int *") 

434 # All AEADs we support besides OCB are streaming so they return nothing 

435 # in finalization. OCB can return up to (16 byte block - 1) bytes so 

436 # we need a buffer here too. 

437 buf = backend._ffi.new("unsigned char[]", 16) 

438 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

439 backend.openssl_assert(res != 0) 

440 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

441 tag_buf = backend._ffi.new("unsigned char[]", tag_length) 

442 res = backend._lib.EVP_CIPHER_CTX_ctrl( 

443 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf 

444 ) 

445 backend.openssl_assert(res != 0) 

446 tag = backend._ffi.buffer(tag_buf)[:] 

447 

448 if isinstance(cipher, AESSIV): 

449 # RFC 5297 defines the output as IV || C, where the tag we generate 

450 # is the "IV" and C is the ciphertext. This is the opposite of our 

451 # other AEADs, which are Ciphertext || Tag 

452 backend.openssl_assert(len(tag) == 16) 

453 return tag + processed_data 

454 else: 

455 return processed_data + tag 

456 

457 

458def _evp_cipher_decrypt( 

459 backend: Backend, 

460 cipher: _AEADTypes, 

461 nonce: bytes, 

462 data: bytes, 

463 associated_data: typing.List[bytes], 

464 tag_length: int, 

465 ctx: typing.Any = None, 

466) -> bytes: 

467 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV 

468 

469 if len(data) < tag_length: 

470 raise InvalidTag 

471 

472 if isinstance(cipher, AESSIV): 

473 # RFC 5297 defines the output as IV || C, where the tag we generate 

474 # is the "IV" and C is the ciphertext. This is the opposite of our 

475 # other AEADs, which are Ciphertext || Tag 

476 tag = data[:tag_length] 

477 data = data[tag_length:] 

478 else: 

479 tag = data[-tag_length:] 

480 data = data[:-tag_length] 

481 if ctx is None: 

482 cipher_name = _evp_cipher_cipher_name(cipher) 

483 ctx = _evp_cipher_aead_setup( 

484 backend, 

485 cipher_name, 

486 cipher._key, 

487 nonce, 

488 tag, 

489 tag_length, 

490 _DECRYPT, 

491 ) 

492 else: 

493 _evp_cipher_set_nonce_operation(backend, ctx, nonce, _DECRYPT) 

494 _evp_cipher_set_tag(backend, ctx, tag) 

495 

496 # CCM requires us to pass the length of the data before processing 

497 # anything. 

498 # However calling this with any other AEAD results in an error 

499 if isinstance(cipher, AESCCM): 

500 _evp_cipher_set_length(backend, ctx, len(data)) 

501 

502 for ad in associated_data: 

503 _evp_cipher_process_aad(backend, ctx, ad) 

504 # CCM has a different error path if the tag doesn't match. Errors are 

505 # raised in Update and Final is irrelevant. 

506 if isinstance(cipher, AESCCM): 

507 outlen = backend._ffi.new("int *") 

508 buf = backend._ffi.new("unsigned char[]", len(data)) 

509 d_ptr = backend._ffi.from_buffer(data) 

510 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, d_ptr, len(data)) 

511 if res != 1: 

512 backend._consume_errors() 

513 raise InvalidTag 

514 

515 processed_data = backend._ffi.buffer(buf, outlen[0])[:] 

516 else: 

517 processed_data = _evp_cipher_process_data(backend, ctx, data) 

518 outlen = backend._ffi.new("int *") 

519 # OCB can return up to 15 bytes (16 byte block - 1) in finalization 

520 buf = backend._ffi.new("unsigned char[]", 16) 

521 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen) 

522 processed_data += backend._ffi.buffer(buf, outlen[0])[:] 

523 if res == 0: 

524 backend._consume_errors() 

525 raise InvalidTag 

526 

527 return processed_data