Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/backends/openssl/aead.py: 11%
120 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
5import typing
7from cryptography.exceptions import InvalidTag
10if typing.TYPE_CHECKING:
11 from cryptography.hazmat.backends.openssl.backend import Backend
12 from cryptography.hazmat.primitives.ciphers.aead import (
13 AESCCM,
14 AESGCM,
15 AESOCB3,
16 AESSIV,
17 ChaCha20Poly1305,
18 )
20 _AEAD_TYPES = typing.Union[
21 AESCCM, AESGCM, AESOCB3, AESSIV, ChaCha20Poly1305
22 ]
24_ENCRYPT = 1
25_DECRYPT = 0
28def _aead_cipher_name(cipher: "_AEAD_TYPES") -> bytes:
29 from cryptography.hazmat.primitives.ciphers.aead import (
30 AESCCM,
31 AESGCM,
32 AESOCB3,
33 AESSIV,
34 ChaCha20Poly1305,
35 )
37 if isinstance(cipher, ChaCha20Poly1305):
38 return b"chacha20-poly1305"
39 elif isinstance(cipher, AESCCM):
40 return f"aes-{len(cipher._key) * 8}-ccm".encode("ascii")
41 elif isinstance(cipher, AESOCB3):
42 return f"aes-{len(cipher._key) * 8}-ocb".encode("ascii")
43 elif isinstance(cipher, AESSIV):
44 return f"aes-{len(cipher._key) * 8 // 2}-siv".encode("ascii")
45 else:
46 assert isinstance(cipher, AESGCM)
47 return f"aes-{len(cipher._key) * 8}-gcm".encode("ascii")
50def _evp_cipher(cipher_name: bytes, backend: "Backend"):
51 if cipher_name.endswith(b"-siv"):
52 evp_cipher = backend._lib.EVP_CIPHER_fetch(
53 backend._ffi.NULL,
54 cipher_name,
55 backend._ffi.NULL,
56 )
57 backend.openssl_assert(evp_cipher != backend._ffi.NULL)
58 evp_cipher = backend._ffi.gc(evp_cipher, backend._lib.EVP_CIPHER_free)
59 else:
60 evp_cipher = backend._lib.EVP_get_cipherbyname(cipher_name)
61 backend.openssl_assert(evp_cipher != backend._ffi.NULL)
63 return evp_cipher
66def _aead_setup(
67 backend: "Backend",
68 cipher_name: bytes,
69 key: bytes,
70 nonce: bytes,
71 tag: typing.Optional[bytes],
72 tag_len: int,
73 operation: int,
74):
75 evp_cipher = _evp_cipher(cipher_name, backend)
76 ctx = backend._lib.EVP_CIPHER_CTX_new()
77 ctx = backend._ffi.gc(ctx, backend._lib.EVP_CIPHER_CTX_free)
78 res = backend._lib.EVP_CipherInit_ex(
79 ctx,
80 evp_cipher,
81 backend._ffi.NULL,
82 backend._ffi.NULL,
83 backend._ffi.NULL,
84 int(operation == _ENCRYPT),
85 )
86 backend.openssl_assert(res != 0)
87 res = backend._lib.EVP_CIPHER_CTX_set_key_length(ctx, len(key))
88 backend.openssl_assert(res != 0)
89 res = backend._lib.EVP_CIPHER_CTX_ctrl(
90 ctx,
91 backend._lib.EVP_CTRL_AEAD_SET_IVLEN,
92 len(nonce),
93 backend._ffi.NULL,
94 )
95 backend.openssl_assert(res != 0)
96 if operation == _DECRYPT:
97 assert tag is not None
98 res = backend._lib.EVP_CIPHER_CTX_ctrl(
99 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, len(tag), tag
100 )
101 backend.openssl_assert(res != 0)
102 elif cipher_name.endswith(b"-ccm"):
103 res = backend._lib.EVP_CIPHER_CTX_ctrl(
104 ctx, backend._lib.EVP_CTRL_AEAD_SET_TAG, tag_len, backend._ffi.NULL
105 )
106 backend.openssl_assert(res != 0)
108 nonce_ptr = backend._ffi.from_buffer(nonce)
109 key_ptr = backend._ffi.from_buffer(key)
110 res = backend._lib.EVP_CipherInit_ex(
111 ctx,
112 backend._ffi.NULL,
113 backend._ffi.NULL,
114 key_ptr,
115 nonce_ptr,
116 int(operation == _ENCRYPT),
117 )
118 backend.openssl_assert(res != 0)
119 return ctx
122def _set_length(backend: "Backend", ctx, data_len: int) -> None:
123 intptr = backend._ffi.new("int *")
124 res = backend._lib.EVP_CipherUpdate(
125 ctx, backend._ffi.NULL, intptr, backend._ffi.NULL, data_len
126 )
127 backend.openssl_assert(res != 0)
130def _process_aad(backend: "Backend", ctx, associated_data: bytes) -> None:
131 outlen = backend._ffi.new("int *")
132 res = backend._lib.EVP_CipherUpdate(
133 ctx, backend._ffi.NULL, outlen, associated_data, len(associated_data)
134 )
135 backend.openssl_assert(res != 0)
138def _process_data(backend: "Backend", ctx, data: bytes) -> bytes:
139 outlen = backend._ffi.new("int *")
140 buf = backend._ffi.new("unsigned char[]", len(data))
141 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data))
142 if res == 0:
143 # AES SIV can error here if the data is invalid on decrypt
144 backend._consume_errors()
145 raise InvalidTag
146 return backend._ffi.buffer(buf, outlen[0])[:]
149def _encrypt(
150 backend: "Backend",
151 cipher: "_AEAD_TYPES",
152 nonce: bytes,
153 data: bytes,
154 associated_data: typing.List[bytes],
155 tag_length: int,
156) -> bytes:
157 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV
159 cipher_name = _aead_cipher_name(cipher)
160 ctx = _aead_setup(
161 backend, cipher_name, cipher._key, nonce, None, tag_length, _ENCRYPT
162 )
163 # CCM requires us to pass the length of the data before processing anything
164 # However calling this with any other AEAD results in an error
165 if isinstance(cipher, AESCCM):
166 _set_length(backend, ctx, len(data))
168 for ad in associated_data:
169 _process_aad(backend, ctx, ad)
170 processed_data = _process_data(backend, ctx, data)
171 outlen = backend._ffi.new("int *")
172 # All AEADs we support besides OCB are streaming so they return nothing
173 # in finalization. OCB can return up to (16 byte block - 1) bytes so
174 # we need a buffer here too.
175 buf = backend._ffi.new("unsigned char[]", 16)
176 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
177 backend.openssl_assert(res != 0)
178 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
179 tag_buf = backend._ffi.new("unsigned char[]", tag_length)
180 res = backend._lib.EVP_CIPHER_CTX_ctrl(
181 ctx, backend._lib.EVP_CTRL_AEAD_GET_TAG, tag_length, tag_buf
182 )
183 backend.openssl_assert(res != 0)
184 tag = backend._ffi.buffer(tag_buf)[:]
186 if isinstance(cipher, AESSIV):
187 # RFC 5297 defines the output as IV || C, where the tag we generate is
188 # the "IV" and C is the ciphertext. This is the opposite of our
189 # other AEADs, which are Ciphertext || Tag
190 backend.openssl_assert(len(tag) == 16)
191 return tag + processed_data
192 else:
193 return processed_data + tag
196def _decrypt(
197 backend: "Backend",
198 cipher: "_AEAD_TYPES",
199 nonce: bytes,
200 data: bytes,
201 associated_data: typing.List[bytes],
202 tag_length: int,
203) -> bytes:
204 from cryptography.hazmat.primitives.ciphers.aead import AESCCM, AESSIV
206 if len(data) < tag_length:
207 raise InvalidTag
209 if isinstance(cipher, AESSIV):
210 # RFC 5297 defines the output as IV || C, where the tag we generate is
211 # the "IV" and C is the ciphertext. This is the opposite of our
212 # other AEADs, which are Ciphertext || Tag
213 tag = data[:tag_length]
214 data = data[tag_length:]
215 else:
216 tag = data[-tag_length:]
217 data = data[:-tag_length]
218 cipher_name = _aead_cipher_name(cipher)
219 ctx = _aead_setup(
220 backend, cipher_name, cipher._key, nonce, tag, tag_length, _DECRYPT
221 )
222 # CCM requires us to pass the length of the data before processing anything
223 # However calling this with any other AEAD results in an error
224 if isinstance(cipher, AESCCM):
225 _set_length(backend, ctx, len(data))
227 for ad in associated_data:
228 _process_aad(backend, ctx, ad)
229 # CCM has a different error path if the tag doesn't match. Errors are
230 # raised in Update and Final is irrelevant.
231 if isinstance(cipher, AESCCM):
232 outlen = backend._ffi.new("int *")
233 buf = backend._ffi.new("unsigned char[]", len(data))
234 res = backend._lib.EVP_CipherUpdate(ctx, buf, outlen, data, len(data))
235 if res != 1:
236 backend._consume_errors()
237 raise InvalidTag
239 processed_data = backend._ffi.buffer(buf, outlen[0])[:]
240 else:
241 processed_data = _process_data(backend, ctx, data)
242 outlen = backend._ffi.new("int *")
243 # OCB can return up to 15 bytes (16 byte block - 1) in finalization
244 buf = backend._ffi.new("unsigned char[]", 16)
245 res = backend._lib.EVP_CipherFinal_ex(ctx, buf, outlen)
246 processed_data += backend._ffi.buffer(buf, outlen[0])[:]
247 if res == 0:
248 backend._consume_errors()
249 raise InvalidTag
251 return processed_data