Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 11%
143 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:36 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5import typing
7from cryptography.exceptions import InvalidTag
9if typing.TYPE_CHECKING:
10 from cryptography.hazmat.backends.openssl.backend import Backend
11 from cryptography.hazmat.primitives.ciphers.aead import (
12 AESCCM,
13 AESGCM,
14 AESOCB3,
15 AESSIV,
16 ChaCha20Poly1305,
17 )
19 _AEADTypes = typing.Union[
20 AESCCM, AESGCM, AESOCB3, AESSIV, ChaCha20Poly1305
21 ]
23_ENCRYPT = 1
24_DECRYPT = 0
27def _aead_cipher_name(cipher: "_AEADTypes") -> bytes:
28 from cryptography.hazmat.primitives.ciphers.aead import (
29 AESCCM,
30 AESGCM,
31 AESOCB3,
32 AESSIV,
33 ChaCha20Poly1305,
34 )
36 if isinstance(cipher, ChaCha20Poly1305):
37 return b"chacha20-poly1305"
38 elif isinstance(cipher, AESCCM):
39 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii")
40 elif isinstance(cipher, AESOCB3):
41 return f"aes-{len(cipher._key) * 8}-ocb".encode("ascii")
42 elif isinstance(cipher, AESSIV):
43 return f"aes-{len(cipher._key) * 8 // 2}-siv".encode("ascii")
44 else:
45 assert isinstance(cipher, AESGCM)
46 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii")
49def _evp_cipher(cipher_name: bytes, backend: "Backend"):
50 if cipher_name.endswith(b"-siv"):
51 evp_cipher = backend._lib.EVP_CIPHER_fetch(
52 backend._ffi.NULL,
53 cipher_name,
54 backend._ffi.NULL,
55 )
56 backend.openssl_assert(evp_cipher != backend._ffi.NULL)
57 evp_cipher = backend._ffi.gc(evp_cipher, backend._lib.EVP_CIPHER_free)
58 else:
59 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name)
60 backend.openssl_assert(evp_cipher != backend._ffi.NULL)
62 return evp_cipher
65def _aead_create_ctx(
66 backend: "Backend",
67 cipher: "_AEADTypes",
68 key: bytes,
69):
70 ctx = backend._lib.EVP_CIPHER_CTX_new()
71 backend.openssl_assert(ctx != backend._ffi.NULL)
72 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
73 cipher_name = _aead_cipher_name(cipher)
74 evp_cipher = _evp_cipher(cipher_name, backend)
75 key_ptr = backend._ffi.from_buffer(key)
76 res = backend._lib.EVP_CipherInit_ex(
77 ctx,
78 evp_cipher,
79 backend._ffi.NULL,
80 key_ptr,
81 backend._ffi.NULL,
82 0,
83 )
84 backend.openssl_assert(res != 0)
85 return ctx
88def _aead_setup(
89 backend: "Backend",
90 cipher_name: bytes,
91 key: bytes,
92 nonce: bytes,
93 tag: typing.Optional[bytes],
94 tag_len: int,
95 operation: int,
96):
97 evp_cipher = _evp_cipher(cipher_name, backend)
98 ctx = backend._lib.EVP_CIPHER_CTX_new()
99 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
100 res = backend._lib.EVP_CipherInit_ex(
101 ctx,
102 evp_cipher,
103 backend._ffi.NULL,
104 backend._ffi.NULL,
105 backend._ffi.NULL,
106 int(operation == _ENCRYPT),
107 )
108 backend.openssl_assert(res != 0)
109 # CCM requires the IVLEN to be set before calling SET_TAG on decrypt
110 res = backend._lib.EVP_CIPHER_CTX_ctrl(
111 ctx,
112 backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
113 len(nonce),
114 backend._ffi.NULL,
115 )
116 backend.openssl_assert(res != 0)
117 if operation == _DECRYPT:
118 assert tag is not None
119 _set_tag(backend, ctx, tag)
120 elif cipher_name.endswith(b"-ccm"):
121 res = backend._lib.EVP_CIPHER_CTX_ctrl(
122 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, tag_len, backend._ffi.NULL
123 )
124 backend.openssl_assert(res != 0)
126 nonce_ptr = backend._ffi.from_buffer(nonce)
127 key_ptr = backend._ffi.from_buffer(key)
128 res = backend._lib.EVP_CipherInit_ex(
129 ctx,
130 backend._ffi.NULL,
131 backend._ffi.NULL,
132 key_ptr,
133 nonce_ptr,
134 int(operation == _ENCRYPT),
135 )
136 backend.openssl_assert(res != 0)
137 return ctx
140def _set_tag(backend, ctx, tag: bytes) -> None:
141 tag_ptr = backend._ffi.from_buffer(tag)
142 res = backend._lib.EVP_CIPHER_CTX_ctrl(
143 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag_ptr
144 )
145 backend.openssl_assert(res != 0)
148def _set_nonce_operation(backend, ctx, nonce: bytes, operation: int) -> None:
149 nonce_ptr = backend._ffi.from_buffer(nonce)
150 res = backend._lib.EVP_CipherInit_ex(
151 ctx,
152 backend._ffi.NULL,
153 backend._ffi.NULL,
154 backend._ffi.NULL,
155 nonce_ptr,
156 int(operation == _ENCRYPT),
157 )
158 backend.openssl_assert(res != 0)
161def _set_length(backend: "Backend", ctx, data_len: int) -> None:
162 intptr = backend._ffi.new("int *")
163 res = backend._lib.EVP_CipherUpdate(
164 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len
165 )
166 backend.openssl_assert(res != 0)
169def _process_aad(backend: "Backend", ctx, associated_data: bytes) -> None:
170 outlen = backend._ffi.new("int *")
171 a_data_ptr = backend._ffi.from_buffer(associated_data)
172 res = backend._lib.EVP_CipherUpdate(
173 ctx, backend._ffi.NULL, outlen, a_data_ptr, len(associated_data)
174 )
175 backend.openssl_assert(res != 0)
178def _process_data(backend: "Backend", ctx, data: bytes) -> bytes:
179 outlen = backend._ffi.new("int *")
180 buf = backend._ffi.new("unsigned char[]", len(data))
181 data_ptr = backend._ffi.from_buffer(data)
182 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data_ptr, len(data))
183 if res == 0:
184 # AES SIV can error here if the data is invalid on decrypt
185 backend._consume_errors()
186 raise InvalidTag
187 return backend._ffi.buffer(buf, outlen[0])[:]
190def _encrypt(
191 backend: "Backend",
192 cipher: "_AEADTypes",
193 nonce: bytes,
194 data: bytes,
195 associated_data: typing.List[bytes],
196 tag_length: int,
197 ctx: typing.Any = None,
198) -> bytes:
199 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV
201 if ctx is None:
202 cipher_name = _aead_cipher_name(cipher)
203 ctx = _aead_setup(
204 backend,
205 cipher_name,
206 cipher._key,
207 nonce,
208 None,
209 tag_length,
210 _ENCRYPT,
211 )
212 else:
213 _set_nonce_operation(backend, ctx, nonce, _ENCRYPT)
215 # CCM requires us to pass the length of the data before processing anything
216 # However calling this with any other AEAD results in an error
217 if isinstance(cipher, AESCCM):
218 _set_length(backend, ctx, len(data))
220 for ad in associated_data:
221 _process_aad(backend, ctx, ad)
222 processed_data = _process_data(backend, ctx, data)
223 outlen = backend._ffi.new("int *")
224 # All AEADs we support besides OCB are streaming so they return nothing
225 # in finalization. OCB can return up to (16 byte block - 1) bytes so
226 # we need a buffer here too.
227 buf = backend._ffi.new("unsigned char[]", 16)
228 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
229 backend.openssl_assert(res != 0)
230 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
231 tag_buf = backend._ffi.new("unsigned char[]", tag_length)
232 res = backend._lib.EVP_CIPHER_CTX_ctrl(
233 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf
234 )
235 backend.openssl_assert(res != 0)
236 tag = backend._ffi.buffer(tag_buf)[:]
238 if isinstance(cipher, AESSIV):
239 # RFC 5297 defines the output as IV || C, where the tag we generate is
240 # the "IV" and C is the ciphertext. This is the opposite of our
241 # other AEADs, which are Ciphertext || Tag
242 backend.openssl_assert(len(tag) == 16)
243 return tag + processed_data
244 else:
245 return processed_data + tag
248def _decrypt(
249 backend: "Backend",
250 cipher: "_AEADTypes",
251 nonce: bytes,
252 data: bytes,
253 associated_data: typing.List[bytes],
254 tag_length: int,
255 ctx: typing.Any = None,
256) -> bytes:
257 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV
259 if len(data) < tag_length:
260 raise InvalidTag
262 if isinstance(cipher, AESSIV):
263 # RFC 5297 defines the output as IV || C, where the tag we generate is
264 # the "IV" and C is the ciphertext. This is the opposite of our
265 # other AEADs, which are Ciphertext || Tag
266 tag = data[:tag_length]
267 data = data[tag_length:]
268 else:
269 tag = data[-tag_length:]
270 data = data[:-tag_length]
271 if ctx is None:
272 cipher_name = _aead_cipher_name(cipher)
273 ctx = _aead_setup(
274 backend, cipher_name, cipher._key, nonce, tag, tag_length, _DECRYPT
275 )
276 else:
277 _set_nonce_operation(backend, ctx, nonce, _DECRYPT)
278 _set_tag(backend, ctx, tag)
280 # CCM requires us to pass the length of the data before processing anything
281 # However calling this with any other AEAD results in an error
282 if isinstance(cipher, AESCCM):
283 _set_length(backend, ctx, len(data))
285 for ad in associated_data:
286 _process_aad(backend, ctx, ad)
287 # CCM has a different error path if the tag doesn't match. Errors are
288 # raised in Update and Final is irrelevant.
289 if isinstance(cipher, AESCCM):
290 outlen = backend._ffi.new("int *")
291 buf = backend._ffi.new("unsigned char[]", len(data))
292 d_ptr = backend._ffi.from_buffer(data)
293 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, d_ptr, len(data))
294 if res != 1:
295 backend._consume_errors()
296 raise InvalidTag
298 processed_data = backend._ffi.buffer(buf, outlen[0])[:]
299 else:
300 processed_data = _process_data(backend, ctx, data)
301 outlen = backend._ffi.new("int *")
302 # OCB can return up to 15 bytes (16 byte block - 1) in finalization
303 buf = backend._ffi.new("unsigned char[]", 16)
304 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
305 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
306 if res == 0:
307 backend._consume_errors()
308 raise InvalidTag
310 return processed_data