1# Copyright 2013-2018 Donald Stufft and individual contributors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14from typing import Optional, Tuple, Union, cast
15
16from nacl import exceptions as exc
17from nacl._sodium import ffi, lib
18from nacl.exceptions import ensure
19
20
21crypto_secretstream_xchacha20poly1305_ABYTES: int = (
22 lib.crypto_secretstream_xchacha20poly1305_abytes()
23)
24crypto_secretstream_xchacha20poly1305_HEADERBYTES: int = (
25 lib.crypto_secretstream_xchacha20poly1305_headerbytes()
26)
27crypto_secretstream_xchacha20poly1305_KEYBYTES: int = (
28 lib.crypto_secretstream_xchacha20poly1305_keybytes()
29)
30crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX: int = (
31 lib.crypto_secretstream_xchacha20poly1305_messagebytes_max()
32)
33crypto_secretstream_xchacha20poly1305_STATEBYTES: int = (
34 lib.crypto_secretstream_xchacha20poly1305_statebytes()
35)
36
37
38crypto_secretstream_xchacha20poly1305_TAG_MESSAGE: int = (
39 lib.crypto_secretstream_xchacha20poly1305_tag_message()
40)
41crypto_secretstream_xchacha20poly1305_TAG_PUSH: int = (
42 lib.crypto_secretstream_xchacha20poly1305_tag_push()
43)
44crypto_secretstream_xchacha20poly1305_TAG_REKEY: int = (
45 lib.crypto_secretstream_xchacha20poly1305_tag_rekey()
46)
47crypto_secretstream_xchacha20poly1305_TAG_FINAL: int = (
48 lib.crypto_secretstream_xchacha20poly1305_tag_final()
49)
50
51
52def crypto_secretstream_xchacha20poly1305_keygen() -> bytes:
53 """
54 Generate a key for use with
55 :func:`.crypto_secretstream_xchacha20poly1305_init_push`.
56
57 """
58 keybuf = ffi.new(
59 "unsigned char[]",
60 crypto_secretstream_xchacha20poly1305_KEYBYTES,
61 )
62 lib.crypto_secretstream_xchacha20poly1305_keygen(keybuf)
63 return ffi.buffer(keybuf)[:]
64
65
66class crypto_secretstream_xchacha20poly1305_state:
67 """
68 An object wrapping the crypto_secretstream_xchacha20poly1305 state.
69
70 """
71
72 __slots__ = ["statebuf", "rawbuf", "tagbuf"]
73
74 def __init__(self) -> None:
75 """Initialize a clean state object."""
76 ByteString = Union[bytes, bytearray, memoryview]
77 self.statebuf: ByteString = ffi.new(
78 "unsigned char[]",
79 crypto_secretstream_xchacha20poly1305_STATEBYTES,
80 )
81
82 self.rawbuf: Optional[ByteString] = None
83 self.tagbuf: Optional[ByteString] = None
84
85
86def crypto_secretstream_xchacha20poly1305_init_push(
87 state: crypto_secretstream_xchacha20poly1305_state, key: bytes
88) -> bytes:
89 """
90 Initialize a crypto_secretstream_xchacha20poly1305 encryption buffer.
91
92 :param state: a secretstream state object
93 :type state: crypto_secretstream_xchacha20poly1305_state
94 :param key: must be
95 :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long
96 :type key: bytes
97 :return: header
98 :rtype: bytes
99
100 """
101 ensure(
102 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
103 "State must be a crypto_secretstream_xchacha20poly1305_state object",
104 raising=exc.TypeError,
105 )
106 ensure(
107 isinstance(key, bytes),
108 "Key must be a bytes sequence",
109 raising=exc.TypeError,
110 )
111 ensure(
112 len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES,
113 "Invalid key length",
114 raising=exc.ValueError,
115 )
116
117 headerbuf = ffi.new(
118 "unsigned char []",
119 crypto_secretstream_xchacha20poly1305_HEADERBYTES,
120 )
121
122 rc = lib.crypto_secretstream_xchacha20poly1305_init_push(
123 state.statebuf, headerbuf, key
124 )
125 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
126
127 return ffi.buffer(headerbuf)[:]
128
129
130def crypto_secretstream_xchacha20poly1305_push(
131 state: crypto_secretstream_xchacha20poly1305_state,
132 m: bytes,
133 ad: Optional[bytes] = None,
134 tag: int = crypto_secretstream_xchacha20poly1305_TAG_MESSAGE,
135) -> bytes:
136 """
137 Add an encrypted message to the secret stream.
138
139 :param state: a secretstream state object
140 :type state: crypto_secretstream_xchacha20poly1305_state
141 :param m: the message to encrypt, the maximum length of an individual
142 message is
143 :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX`.
144 :type m: bytes
145 :param ad: additional data to include in the authentication tag
146 :type ad: bytes or None
147 :param tag: the message tag, usually
148 :data:`.crypto_secretstream_xchacha20poly1305_TAG_MESSAGE` or
149 :data:`.crypto_secretstream_xchacha20poly1305_TAG_FINAL`.
150 :type tag: int
151 :return: ciphertext
152 :rtype: bytes
153
154 """
155 ensure(
156 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
157 "State must be a crypto_secretstream_xchacha20poly1305_state object",
158 raising=exc.TypeError,
159 )
160 ensure(isinstance(m, bytes), "Message is not bytes", raising=exc.TypeError)
161 ensure(
162 len(m) <= crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX,
163 "Message is too long",
164 raising=exc.ValueError,
165 )
166 ensure(
167 ad is None or isinstance(ad, bytes),
168 "Additional data must be bytes or None",
169 raising=exc.TypeError,
170 )
171
172 clen = len(m) + crypto_secretstream_xchacha20poly1305_ABYTES
173 if state.rawbuf is None or len(state.rawbuf) < clen:
174 state.rawbuf = ffi.new("unsigned char[]", clen)
175
176 if ad is None:
177 ad = ffi.NULL
178 adlen = 0
179 else:
180 adlen = len(ad)
181
182 rc = lib.crypto_secretstream_xchacha20poly1305_push(
183 state.statebuf,
184 state.rawbuf,
185 ffi.NULL,
186 m,
187 len(m),
188 ad,
189 adlen,
190 tag,
191 )
192 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
193
194 return ffi.buffer(state.rawbuf, clen)[:]
195
196
197def crypto_secretstream_xchacha20poly1305_init_pull(
198 state: crypto_secretstream_xchacha20poly1305_state,
199 header: bytes,
200 key: bytes,
201) -> None:
202 """
203 Initialize a crypto_secretstream_xchacha20poly1305 decryption buffer.
204
205 :param state: a secretstream state object
206 :type state: crypto_secretstream_xchacha20poly1305_state
207 :param header: must be
208 :data:`.crypto_secretstream_xchacha20poly1305_HEADERBYTES` long
209 :type header: bytes
210 :param key: must be
211 :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long
212 :type key: bytes
213
214 """
215 ensure(
216 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
217 "State must be a crypto_secretstream_xchacha20poly1305_state object",
218 raising=exc.TypeError,
219 )
220 ensure(
221 isinstance(header, bytes),
222 "Header must be a bytes sequence",
223 raising=exc.TypeError,
224 )
225 ensure(
226 len(header) == crypto_secretstream_xchacha20poly1305_HEADERBYTES,
227 "Invalid header length",
228 raising=exc.ValueError,
229 )
230 ensure(
231 isinstance(key, bytes),
232 "Key must be a bytes sequence",
233 raising=exc.TypeError,
234 )
235 ensure(
236 len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES,
237 "Invalid key length",
238 raising=exc.ValueError,
239 )
240
241 if state.tagbuf is None:
242 state.tagbuf = ffi.new("unsigned char *")
243
244 rc = lib.crypto_secretstream_xchacha20poly1305_init_pull(
245 state.statebuf, header, key
246 )
247 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
248
249
250def crypto_secretstream_xchacha20poly1305_pull(
251 state: crypto_secretstream_xchacha20poly1305_state,
252 c: bytes,
253 ad: Optional[bytes] = None,
254) -> Tuple[bytes, int]:
255 """
256 Read a decrypted message from the secret stream.
257
258 :param state: a secretstream state object
259 :type state: crypto_secretstream_xchacha20poly1305_state
260 :param c: the ciphertext to decrypt, the maximum length of an individual
261 ciphertext is
262 :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX` +
263 :data:`.crypto_secretstream_xchacha20poly1305_ABYTES`.
264 :type c: bytes
265 :param ad: additional data to include in the authentication tag
266 :type ad: bytes or None
267 :return: (message, tag)
268 :rtype: (bytes, int)
269
270 """
271 ensure(
272 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
273 "State must be a crypto_secretstream_xchacha20poly1305_state object",
274 raising=exc.TypeError,
275 )
276 ensure(
277 state.tagbuf is not None,
278 (
279 "State must be initialized using "
280 "crypto_secretstream_xchacha20poly1305_init_pull"
281 ),
282 raising=exc.ValueError,
283 )
284 ensure(
285 isinstance(c, bytes),
286 "Ciphertext is not bytes",
287 raising=exc.TypeError,
288 )
289 ensure(
290 len(c) >= crypto_secretstream_xchacha20poly1305_ABYTES,
291 "Ciphertext is too short",
292 raising=exc.ValueError,
293 )
294 ensure(
295 len(c)
296 <= (
297 crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX
298 + crypto_secretstream_xchacha20poly1305_ABYTES
299 ),
300 "Ciphertext is too long",
301 raising=exc.ValueError,
302 )
303 ensure(
304 ad is None or isinstance(ad, bytes),
305 "Additional data must be bytes or None",
306 raising=exc.TypeError,
307 )
308
309 mlen = len(c) - crypto_secretstream_xchacha20poly1305_ABYTES
310 if state.rawbuf is None or len(state.rawbuf) < mlen:
311 state.rawbuf = ffi.new("unsigned char[]", mlen)
312
313 if ad is None:
314 ad = ffi.NULL
315 adlen = 0
316 else:
317 adlen = len(ad)
318
319 rc = lib.crypto_secretstream_xchacha20poly1305_pull(
320 state.statebuf,
321 state.rawbuf,
322 ffi.NULL,
323 state.tagbuf,
324 c,
325 len(c),
326 ad,
327 adlen,
328 )
329 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
330
331 # Cast safety: we `ensure` above that `state.tagbuf is not None`.
332 return (
333 ffi.buffer(state.rawbuf, mlen)[:],
334 int(cast(bytes, state.tagbuf)[0]),
335 )
336
337
338def crypto_secretstream_xchacha20poly1305_rekey(
339 state: crypto_secretstream_xchacha20poly1305_state,
340) -> None:
341 """
342 Explicitly change the encryption key in the stream.
343
344 Normally the stream is re-keyed as needed or an explicit ``tag`` of
345 :data:`.crypto_secretstream_xchacha20poly1305_TAG_REKEY` is added to a
346 message to ensure forward secrecy, but this method can be used instead
347 if the re-keying is controlled without adding the tag.
348
349 :param state: a secretstream state object
350 :type state: crypto_secretstream_xchacha20poly1305_state
351
352 """
353 ensure(
354 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
355 "State must be a crypto_secretstream_xchacha20poly1305_state object",
356 raising=exc.TypeError,
357 )
358 lib.crypto_secretstream_xchacha20poly1305_rekey(state.statebuf)