Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 12%
213 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:13 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:13 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5from __future__ import annotations
7import typing
9from cryptography.exceptions import InvalidTag
11if typing.TYPE_CHECKING:
12 from cryptography.hazmat.backends.openssl.backend import Backend
13 from cryptography.hazmat.primitives.ciphers.aead import (
14 AESCCM,
15 AESGCM,
16 AESOCB3,
17 AESSIV,
18 ChaCha20Poly1305,
19 )
21 _AEADTypes = typing.Union[
22 AESCCM, AESGCM, AESOCB3, AESSIV, ChaCha20Poly1305
23 ]
26def _is_evp_aead_supported_cipher(
27 backend: Backend, cipher: _AEADTypes
28) -> bool:
29 """
30 Checks whether the given cipher is supported through
31 EVP_AEAD rather than the normal OpenSSL EVP_CIPHER API.
32 """
33 from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
35 return backend._lib.Cryptography_HAS_EVP_AEAD and isinstance(
36 cipher, ChaCha20Poly1305
37 )
40def _aead_cipher_supported(backend: Backend, cipher: _AEADTypes) -> bool:
41 if _is_evp_aead_supported_cipher(backend, cipher):
42 return True
43 else:
44 cipher_name = _evp_cipher_cipher_name(cipher)
45 if backend._fips_enabled and cipher_name not in backend._fips_aead:
46 return False
47 # SIV isn't loaded through get_cipherbyname but instead a new fetch API
48 # only available in 3.0+. But if we know we're on 3.0+ then we know
49 # it's supported.
50 if cipher_name.endswith(b"-siv"):
51 return backend._lib.CRYPTOGRAPHY_OPENSSL_300_OR_GREATER == 1
52 else:
53 return (
54 backend._lib.EVP_get_cipherbyname(cipher_name)
55 != backend._ffi.NULL
56 )
59def _aead_create_ctx(
60 backend: Backend,
61 cipher: _AEADTypes,
62 key: bytes,
63):
64 if _is_evp_aead_supported_cipher(backend, cipher):
65 return _evp_aead_create_ctx(backend, cipher, key)
66 else:
67 return _evp_cipher_create_ctx(backend, cipher, key)
70def _encrypt(
71 backend: Backend,
72 cipher: _AEADTypes,
73 nonce: bytes,
74 data: bytes,
75 associated_data: typing.List[bytes],
76 tag_length: int,
77 ctx: typing.Any = None,
78) -> bytes:
79 if _is_evp_aead_supported_cipher(backend, cipher):
80 return _evp_aead_encrypt(
81 backend, cipher, nonce, data, associated_data, tag_length, ctx
82 )
83 else:
84 return _evp_cipher_encrypt(
85 backend, cipher, nonce, data, associated_data, tag_length, ctx
86 )
89def _decrypt(
90 backend: Backend,
91 cipher: _AEADTypes,
92 nonce: bytes,
93 data: bytes,
94 associated_data: typing.List[bytes],
95 tag_length: int,
96 ctx: typing.Any = None,
97) -> bytes:
98 if _is_evp_aead_supported_cipher(backend, cipher):
99 return _evp_aead_decrypt(
100 backend, cipher, nonce, data, associated_data, tag_length, ctx
101 )
102 else:
103 return _evp_cipher_decrypt(
104 backend, cipher, nonce, data, associated_data, tag_length, ctx
105 )
108def _evp_aead_create_ctx(
109 backend: Backend,
110 cipher: _AEADTypes,
111 key: bytes,
112 tag_len: typing.Optional[int] = None,
113):
114 aead_cipher = _evp_aead_get_cipher(backend, cipher)
115 assert aead_cipher is not None
116 key_ptr = backend._ffi.from_buffer(key)
117 tag_len = (
118 backend._lib.EVP_AEAD_DEFAULT_TAG_LENGTH
119 if tag_len is None
120 else tag_len
121 )
122 ctx = backend._lib.Cryptography_EVP_AEAD_CTX_new(
123 aead_cipher, key_ptr, len(key), tag_len
124 )
125 backend.openssl_assert(ctx != backend._ffi.NULL)
126 ctx = backend._ffi.gc(ctx, backend._lib.EVP_AEAD_CTX_free)
127 return ctx
130def _evp_aead_get_cipher(backend: Backend, cipher: _AEADTypes):
131 from cryptography.hazmat.primitives.ciphers.aead import (
132 ChaCha20Poly1305,
133 )
135 # Currently only ChaCha20-Poly1305 is supported using this API
136 assert isinstance(cipher, ChaCha20Poly1305)
137 return backend._lib.EVP_aead_chacha20_poly1305()
140def _evp_aead_encrypt(
141 backend: Backend,
142 cipher: _AEADTypes,
143 nonce: bytes,
144 data: bytes,
145 associated_data: typing.List[bytes],
146 tag_length: int,
147 ctx: typing.Any,
148) -> bytes:
149 assert ctx is not None
151 aead_cipher = _evp_aead_get_cipher(backend, cipher)
152 assert aead_cipher is not None
154 out_len = backend._ffi.new("size_t *")
155 # max_out_len should be in_len plus the result of
156 # EVP_AEAD_max_overhead.
157 max_out_len = len(data) + backend._lib.EVP_AEAD_max_overhead(aead_cipher)
158 out_buf = backend._ffi.new("uint8_t[]", max_out_len)
159 data_ptr = backend._ffi.from_buffer(data)
160 nonce_ptr = backend._ffi.from_buffer(nonce)
161 aad = b"".join(associated_data)
162 aad_ptr = backend._ffi.from_buffer(aad)
164 res = backend._lib.EVP_AEAD_CTX_seal(
165 ctx,
166 out_buf,
167 out_len,
168 max_out_len,
169 nonce_ptr,
170 len(nonce),
171 data_ptr,
172 len(data),
173 aad_ptr,
174 len(aad),
175 )
176 backend.openssl_assert(res == 1)
177 encrypted_data = backend._ffi.buffer(out_buf, out_len[0])[:]
178 return encrypted_data
181def _evp_aead_decrypt(
182 backend: Backend,
183 cipher: _AEADTypes,
184 nonce: bytes,
185 data: bytes,
186 associated_data: typing.List[bytes],
187 tag_length: int,
188 ctx: typing.Any,
189) -> bytes:
190 if len(data) < tag_length:
191 raise InvalidTag
193 assert ctx is not None
195 out_len = backend._ffi.new("size_t *")
196 # max_out_len should at least in_len
197 max_out_len = len(data)
198 out_buf = backend._ffi.new("uint8_t[]", max_out_len)
199 data_ptr = backend._ffi.from_buffer(data)
200 nonce_ptr = backend._ffi.from_buffer(nonce)
201 aad = b"".join(associated_data)
202 aad_ptr = backend._ffi.from_buffer(aad)
204 res = backend._lib.EVP_AEAD_CTX_open(
205 ctx,
206 out_buf,
207 out_len,
208 max_out_len,
209 nonce_ptr,
210 len(nonce),
211 data_ptr,
212 len(data),
213 aad_ptr,
214 len(aad),
215 )
217 if res == 0:
218 backend._consume_errors()
219 raise InvalidTag
221 decrypted_data = backend._ffi.buffer(out_buf, out_len[0])[:]
222 return decrypted_data
225_ENCRYPT = 1
226_DECRYPT = 0
229def _evp_cipher_cipher_name(cipher: _AEADTypes) -> bytes:
230 from cryptography.hazmat.primitives.ciphers.aead import (
231 AESCCM,
232 AESGCM,
233 AESOCB3,
234 AESSIV,
235 ChaCha20Poly1305,
236 )
238 if isinstance(cipher, ChaCha20Poly1305):
239 return b"chacha20-poly1305"
240 elif isinstance(cipher, AESCCM):
241 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii")
242 elif isinstance(cipher, AESOCB3):
243 return f"aes-{len(cipher._key) * 8}-ocb".encode("ascii")
244 elif isinstance(cipher, AESSIV):
245 return f"aes-{len(cipher._key) * 8 // 2}-siv".encode("ascii")
246 else:
247 assert isinstance(cipher, AESGCM)
248 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii")
251def _evp_cipher(cipher_name: bytes, backend: Backend):
252 if cipher_name.endswith(b"-siv"):
253 evp_cipher = backend._lib.EVP_CIPHER_fetch(
254 backend._ffi.NULL,
255 cipher_name,
256 backend._ffi.NULL,
257 )
258 backend.openssl_assert(evp_cipher != backend._ffi.NULL)
259 evp_cipher = backend._ffi.gc(evp_cipher, backend._lib.EVP_CIPHER_free)
260 else:
261 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name)
262 backend.openssl_assert(evp_cipher != backend._ffi.NULL)
264 return evp_cipher
267def _evp_cipher_create_ctx(
268 backend: Backend,
269 cipher: _AEADTypes,
270 key: bytes,
271):
272 ctx = backend._lib.EVP_CIPHER_CTX_new()
273 backend.openssl_assert(ctx != backend._ffi.NULL)
274 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
275 cipher_name = _evp_cipher_cipher_name(cipher)
276 evp_cipher = _evp_cipher(cipher_name, backend)
277 key_ptr = backend._ffi.from_buffer(key)
278 res = backend._lib.EVP_CipherInit_ex(
279 ctx,
280 evp_cipher,
281 backend._ffi.NULL,
282 key_ptr,
283 backend._ffi.NULL,
284 0,
285 )
286 backend.openssl_assert(res != 0)
287 return ctx
290def _evp_cipher_aead_setup(
291 backend: Backend,
292 cipher_name: bytes,
293 key: bytes,
294 nonce: bytes,
295 tag: typing.Optional[bytes],
296 tag_len: int,
297 operation: int,
298):
299 evp_cipher = _evp_cipher(cipher_name, backend)
300 ctx = backend._lib.EVP_CIPHER_CTX_new()
301 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
302 res = backend._lib.EVP_CipherInit_ex(
303 ctx,
304 evp_cipher,
305 backend._ffi.NULL,
306 backend._ffi.NULL,
307 backend._ffi.NULL,
308 int(operation == _ENCRYPT),
309 )
310 backend.openssl_assert(res != 0)
311 # CCM requires the IVLEN to be set before calling SET_TAG on decrypt
312 res = backend._lib.EVP_CIPHER_CTX_ctrl(
313 ctx,
314 backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
315 len(nonce),
316 backend._ffi.NULL,
317 )
318 backend.openssl_assert(res != 0)
319 if operation == _DECRYPT:
320 assert tag is not None
321 _evp_cipher_set_tag(backend, ctx, tag)
322 elif cipher_name.endswith(b"-ccm"):
323 res = backend._lib.EVP_CIPHER_CTX_ctrl(
324 ctx,
325 backend._lib.EVP_CTRL_AEAD_SET_TAG,
326 tag_len,
327 backend._ffi.NULL,
328 )
329 backend.openssl_assert(res != 0)
331 nonce_ptr = backend._ffi.from_buffer(nonce)
332 key_ptr = backend._ffi.from_buffer(key)
333 res = backend._lib.EVP_CipherInit_ex(
334 ctx,
335 backend._ffi.NULL,
336 backend._ffi.NULL,
337 key_ptr,
338 nonce_ptr,
339 int(operation == _ENCRYPT),
340 )
341 backend.openssl_assert(res != 0)
342 return ctx
345def _evp_cipher_set_tag(backend, ctx, tag: bytes) -> None:
346 tag_ptr = backend._ffi.from_buffer(tag)
347 res = backend._lib.EVP_CIPHER_CTX_ctrl(
348 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag_ptr
349 )
350 backend.openssl_assert(res != 0)
353def _evp_cipher_set_nonce_operation(
354 backend, ctx, nonce: bytes, operation: int
355) -> None:
356 nonce_ptr = backend._ffi.from_buffer(nonce)
357 res = backend._lib.EVP_CipherInit_ex(
358 ctx,
359 backend._ffi.NULL,
360 backend._ffi.NULL,
361 backend._ffi.NULL,
362 nonce_ptr,
363 int(operation == _ENCRYPT),
364 )
365 backend.openssl_assert(res != 0)
368def _evp_cipher_set_length(backend: Backend, ctx, data_len: int) -> None:
369 intptr = backend._ffi.new("int *")
370 res = backend._lib.EVP_CipherUpdate(
371 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len
372 )
373 backend.openssl_assert(res != 0)
376def _evp_cipher_process_aad(
377 backend: Backend, ctx, associated_data: bytes
378) -> None:
379 outlen = backend._ffi.new("int *")
380 a_data_ptr = backend._ffi.from_buffer(associated_data)
381 res = backend._lib.EVP_CipherUpdate(
382 ctx, backend._ffi.NULL, outlen, a_data_ptr, len(associated_data)
383 )
384 backend.openssl_assert(res != 0)
387def _evp_cipher_process_data(backend: Backend, ctx, data: bytes) -> bytes:
388 outlen = backend._ffi.new("int *")
389 buf = backend._ffi.new("unsigned char[]", len(data))
390 data_ptr = backend._ffi.from_buffer(data)
391 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data_ptr, len(data))
392 if res == 0:
393 # AES SIV can error here if the data is invalid on decrypt
394 backend._consume_errors()
395 raise InvalidTag
396 return backend._ffi.buffer(buf, outlen[0])[:]
399def _evp_cipher_encrypt(
400 backend: Backend,
401 cipher: _AEADTypes,
402 nonce: bytes,
403 data: bytes,
404 associated_data: typing.List[bytes],
405 tag_length: int,
406 ctx: typing.Any = None,
407) -> bytes:
408 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV
410 if ctx is None:
411 cipher_name = _evp_cipher_cipher_name(cipher)
412 ctx = _evp_cipher_aead_setup(
413 backend,
414 cipher_name,
415 cipher._key,
416 nonce,
417 None,
418 tag_length,
419 _ENCRYPT,
420 )
421 else:
422 _evp_cipher_set_nonce_operation(backend, ctx, nonce, _ENCRYPT)
424 # CCM requires us to pass the length of the data before processing
425 # anything.
426 # However calling this with any other AEAD results in an error
427 if isinstance(cipher, AESCCM):
428 _evp_cipher_set_length(backend, ctx, len(data))
430 for ad in associated_data:
431 _evp_cipher_process_aad(backend, ctx, ad)
432 processed_data = _evp_cipher_process_data(backend, ctx, data)
433 outlen = backend._ffi.new("int *")
434 # All AEADs we support besides OCB are streaming so they return nothing
435 # in finalization. OCB can return up to (16 byte block - 1) bytes so
436 # we need a buffer here too.
437 buf = backend._ffi.new("unsigned char[]", 16)
438 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
439 backend.openssl_assert(res != 0)
440 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
441 tag_buf = backend._ffi.new("unsigned char[]", tag_length)
442 res = backend._lib.EVP_CIPHER_CTX_ctrl(
443 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf
444 )
445 backend.openssl_assert(res != 0)
446 tag = backend._ffi.buffer(tag_buf)[:]
448 if isinstance(cipher, AESSIV):
449 # RFC 5297 defines the output as IV || C, where the tag we generate
450 # is the "IV" and C is the ciphertext. This is the opposite of our
451 # other AEADs, which are Ciphertext || Tag
452 backend.openssl_assert(len(tag) == 16)
453 return tag + processed_data
454 else:
455 return processed_data + tag
458def _evp_cipher_decrypt(
459 backend: Backend,
460 cipher: _AEADTypes,
461 nonce: bytes,
462 data: bytes,
463 associated_data: typing.List[bytes],
464 tag_length: int,
465 ctx: typing.Any = None,
466) -> bytes:
467 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV
469 if len(data) < tag_length:
470 raise InvalidTag
472 if isinstance(cipher, AESSIV):
473 # RFC 5297 defines the output as IV || C, where the tag we generate
474 # is the "IV" and C is the ciphertext. This is the opposite of our
475 # other AEADs, which are Ciphertext || Tag
476 tag = data[:tag_length]
477 data = data[tag_length:]
478 else:
479 tag = data[-tag_length:]
480 data = data[:-tag_length]
481 if ctx is None:
482 cipher_name = _evp_cipher_cipher_name(cipher)
483 ctx = _evp_cipher_aead_setup(
484 backend,
485 cipher_name,
486 cipher._key,
487 nonce,
488 tag,
489 tag_length,
490 _DECRYPT,
491 )
492 else:
493 _evp_cipher_set_nonce_operation(backend, ctx, nonce, _DECRYPT)
494 _evp_cipher_set_tag(backend, ctx, tag)
496 # CCM requires us to pass the length of the data before processing
497 # anything.
498 # However calling this with any other AEAD results in an error
499 if isinstance(cipher, AESCCM):
500 _evp_cipher_set_length(backend, ctx, len(data))
502 for ad in associated_data:
503 _evp_cipher_process_aad(backend, ctx, ad)
504 # CCM has a different error path if the tag doesn't match. Errors are
505 # raised in Update and Final is irrelevant.
506 if isinstance(cipher, AESCCM):
507 outlen = backend._ffi.new("int *")
508 buf = backend._ffi.new("unsigned char[]", len(data))
509 d_ptr = backend._ffi.from_buffer(data)
510 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, d_ptr, len(data))
511 if res != 1:
512 backend._consume_errors()
513 raise InvalidTag
515 processed_data = backend._ffi.buffer(buf, outlen[0])[:]
516 else:
517 processed_data = _evp_cipher_process_data(backend, ctx, data)
518 outlen = backend._ffi.new("int *")
519 # OCB can return up to 15 bytes (16 byte block - 1) in finalization
520 buf = backend._ffi.new("unsigned char[]", 16)
521 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
522 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
523 if res == 0:
524 backend._consume_errors()
525 raise InvalidTag
527 return processed_data