1# Copyright 2013-2018 Donald Stufft and individual contributors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14from typing import ByteString, Optional, Tuple, cast
15
16from nacl import exceptions as exc
17from nacl._sodium import ffi, lib
18from nacl.exceptions import ensure
19
20
21crypto_secretstream_xchacha20poly1305_ABYTES: int = (
22 lib.crypto_secretstream_xchacha20poly1305_abytes()
23)
24crypto_secretstream_xchacha20poly1305_HEADERBYTES: int = (
25 lib.crypto_secretstream_xchacha20poly1305_headerbytes()
26)
27crypto_secretstream_xchacha20poly1305_KEYBYTES: int = (
28 lib.crypto_secretstream_xchacha20poly1305_keybytes()
29)
30crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX: int = (
31 lib.crypto_secretstream_xchacha20poly1305_messagebytes_max()
32)
33crypto_secretstream_xchacha20poly1305_STATEBYTES: int = (
34 lib.crypto_secretstream_xchacha20poly1305_statebytes()
35)
36
37
38crypto_secretstream_xchacha20poly1305_TAG_MESSAGE: int = (
39 lib.crypto_secretstream_xchacha20poly1305_tag_message()
40)
41crypto_secretstream_xchacha20poly1305_TAG_PUSH: int = (
42 lib.crypto_secretstream_xchacha20poly1305_tag_push()
43)
44crypto_secretstream_xchacha20poly1305_TAG_REKEY: int = (
45 lib.crypto_secretstream_xchacha20poly1305_tag_rekey()
46)
47crypto_secretstream_xchacha20poly1305_TAG_FINAL: int = (
48 lib.crypto_secretstream_xchacha20poly1305_tag_final()
49)
50
51
52def crypto_secretstream_xchacha20poly1305_keygen() -> bytes:
53 """
54 Generate a key for use with
55 :func:`.crypto_secretstream_xchacha20poly1305_init_push`.
56
57 """
58 keybuf = ffi.new(
59 "unsigned char[]",
60 crypto_secretstream_xchacha20poly1305_KEYBYTES,
61 )
62 lib.crypto_secretstream_xchacha20poly1305_keygen(keybuf)
63 return ffi.buffer(keybuf)[:]
64
65
66class crypto_secretstream_xchacha20poly1305_state:
67 """
68 An object wrapping the crypto_secretstream_xchacha20poly1305 state.
69
70 """
71
72 __slots__ = ["statebuf", "rawbuf", "tagbuf"]
73
74 def __init__(self) -> None:
75 """Initialize a clean state object."""
76 self.statebuf: ByteString = ffi.new(
77 "unsigned char[]",
78 crypto_secretstream_xchacha20poly1305_STATEBYTES,
79 )
80
81 self.rawbuf: Optional[ByteString] = None
82 self.tagbuf: Optional[ByteString] = None
83
84
85def crypto_secretstream_xchacha20poly1305_init_push(
86 state: crypto_secretstream_xchacha20poly1305_state, key: bytes
87) -> bytes:
88 """
89 Initialize a crypto_secretstream_xchacha20poly1305 encryption buffer.
90
91 :param state: a secretstream state object
92 :type state: crypto_secretstream_xchacha20poly1305_state
93 :param key: must be
94 :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long
95 :type key: bytes
96 :return: header
97 :rtype: bytes
98
99 """
100 ensure(
101 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
102 "State must be a crypto_secretstream_xchacha20poly1305_state object",
103 raising=exc.TypeError,
104 )
105 ensure(
106 isinstance(key, bytes),
107 "Key must be a bytes sequence",
108 raising=exc.TypeError,
109 )
110 ensure(
111 len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES,
112 "Invalid key length",
113 raising=exc.ValueError,
114 )
115
116 headerbuf = ffi.new(
117 "unsigned char []",
118 crypto_secretstream_xchacha20poly1305_HEADERBYTES,
119 )
120
121 rc = lib.crypto_secretstream_xchacha20poly1305_init_push(
122 state.statebuf, headerbuf, key
123 )
124 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
125
126 return ffi.buffer(headerbuf)[:]
127
128
129def crypto_secretstream_xchacha20poly1305_push(
130 state: crypto_secretstream_xchacha20poly1305_state,
131 m: bytes,
132 ad: Optional[bytes] = None,
133 tag: int = crypto_secretstream_xchacha20poly1305_TAG_MESSAGE,
134) -> bytes:
135 """
136 Add an encrypted message to the secret stream.
137
138 :param state: a secretstream state object
139 :type state: crypto_secretstream_xchacha20poly1305_state
140 :param m: the message to encrypt, the maximum length of an individual
141 message is
142 :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX`.
143 :type m: bytes
144 :param ad: additional data to include in the authentication tag
145 :type ad: bytes or None
146 :param tag: the message tag, usually
147 :data:`.crypto_secretstream_xchacha20poly1305_TAG_MESSAGE` or
148 :data:`.crypto_secretstream_xchacha20poly1305_TAG_FINAL`.
149 :type tag: int
150 :return: ciphertext
151 :rtype: bytes
152
153 """
154 ensure(
155 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
156 "State must be a crypto_secretstream_xchacha20poly1305_state object",
157 raising=exc.TypeError,
158 )
159 ensure(isinstance(m, bytes), "Message is not bytes", raising=exc.TypeError)
160 ensure(
161 len(m) <= crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX,
162 "Message is too long",
163 raising=exc.ValueError,
164 )
165 ensure(
166 ad is None or isinstance(ad, bytes),
167 "Additional data must be bytes or None",
168 raising=exc.TypeError,
169 )
170
171 clen = len(m) + crypto_secretstream_xchacha20poly1305_ABYTES
172 if state.rawbuf is None or len(state.rawbuf) < clen:
173 state.rawbuf = ffi.new("unsigned char[]", clen)
174
175 if ad is None:
176 ad = ffi.NULL
177 adlen = 0
178 else:
179 adlen = len(ad)
180
181 rc = lib.crypto_secretstream_xchacha20poly1305_push(
182 state.statebuf,
183 state.rawbuf,
184 ffi.NULL,
185 m,
186 len(m),
187 ad,
188 adlen,
189 tag,
190 )
191 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
192
193 return ffi.buffer(state.rawbuf, clen)[:]
194
195
196def crypto_secretstream_xchacha20poly1305_init_pull(
197 state: crypto_secretstream_xchacha20poly1305_state,
198 header: bytes,
199 key: bytes,
200) -> None:
201 """
202 Initialize a crypto_secretstream_xchacha20poly1305 decryption buffer.
203
204 :param state: a secretstream state object
205 :type state: crypto_secretstream_xchacha20poly1305_state
206 :param header: must be
207 :data:`.crypto_secretstream_xchacha20poly1305_HEADERBYTES` long
208 :type header: bytes
209 :param key: must be
210 :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long
211 :type key: bytes
212
213 """
214 ensure(
215 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
216 "State must be a crypto_secretstream_xchacha20poly1305_state object",
217 raising=exc.TypeError,
218 )
219 ensure(
220 isinstance(header, bytes),
221 "Header must be a bytes sequence",
222 raising=exc.TypeError,
223 )
224 ensure(
225 len(header) == crypto_secretstream_xchacha20poly1305_HEADERBYTES,
226 "Invalid header length",
227 raising=exc.ValueError,
228 )
229 ensure(
230 isinstance(key, bytes),
231 "Key must be a bytes sequence",
232 raising=exc.TypeError,
233 )
234 ensure(
235 len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES,
236 "Invalid key length",
237 raising=exc.ValueError,
238 )
239
240 if state.tagbuf is None:
241 state.tagbuf = ffi.new("unsigned char *")
242
243 rc = lib.crypto_secretstream_xchacha20poly1305_init_pull(
244 state.statebuf, header, key
245 )
246 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
247
248
249def crypto_secretstream_xchacha20poly1305_pull(
250 state: crypto_secretstream_xchacha20poly1305_state,
251 c: bytes,
252 ad: Optional[bytes] = None,
253) -> Tuple[bytes, int]:
254 """
255 Read a decrypted message from the secret stream.
256
257 :param state: a secretstream state object
258 :type state: crypto_secretstream_xchacha20poly1305_state
259 :param c: the ciphertext to decrypt, the maximum length of an individual
260 ciphertext is
261 :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX` +
262 :data:`.crypto_secretstream_xchacha20poly1305_ABYTES`.
263 :type c: bytes
264 :param ad: additional data to include in the authentication tag
265 :type ad: bytes or None
266 :return: (message, tag)
267 :rtype: (bytes, int)
268
269 """
270 ensure(
271 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
272 "State must be a crypto_secretstream_xchacha20poly1305_state object",
273 raising=exc.TypeError,
274 )
275 ensure(
276 state.tagbuf is not None,
277 (
278 "State must be initialized using "
279 "crypto_secretstream_xchacha20poly1305_init_pull"
280 ),
281 raising=exc.ValueError,
282 )
283 ensure(
284 isinstance(c, bytes),
285 "Ciphertext is not bytes",
286 raising=exc.TypeError,
287 )
288 ensure(
289 len(c) >= crypto_secretstream_xchacha20poly1305_ABYTES,
290 "Ciphertext is too short",
291 raising=exc.ValueError,
292 )
293 ensure(
294 len(c)
295 <= (
296 crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX
297 + crypto_secretstream_xchacha20poly1305_ABYTES
298 ),
299 "Ciphertext is too long",
300 raising=exc.ValueError,
301 )
302 ensure(
303 ad is None or isinstance(ad, bytes),
304 "Additional data must be bytes or None",
305 raising=exc.TypeError,
306 )
307
308 mlen = len(c) - crypto_secretstream_xchacha20poly1305_ABYTES
309 if state.rawbuf is None or len(state.rawbuf) < mlen:
310 state.rawbuf = ffi.new("unsigned char[]", mlen)
311
312 if ad is None:
313 ad = ffi.NULL
314 adlen = 0
315 else:
316 adlen = len(ad)
317
318 rc = lib.crypto_secretstream_xchacha20poly1305_pull(
319 state.statebuf,
320 state.rawbuf,
321 ffi.NULL,
322 state.tagbuf,
323 c,
324 len(c),
325 ad,
326 adlen,
327 )
328 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError)
329
330 # Cast safety: we `ensure` above that `state.tagbuf is not None`.
331 return (
332 ffi.buffer(state.rawbuf, mlen)[:],
333 int(cast(bytes, state.tagbuf)[0]),
334 )
335
336
337def crypto_secretstream_xchacha20poly1305_rekey(
338 state: crypto_secretstream_xchacha20poly1305_state,
339) -> None:
340 """
341 Explicitly change the encryption key in the stream.
342
343 Normally the stream is re-keyed as needed or an explicit ``tag`` of
344 :data:`.crypto_secretstream_xchacha20poly1305_TAG_REKEY` is added to a
345 message to ensure forward secrecy, but this method can be used instead
346 if the re-keying is controlled without adding the tag.
347
348 :param state: a secretstream state object
349 :type state: crypto_secretstream_xchacha20poly1305_state
350
351 """
352 ensure(
353 isinstance(state, crypto_secretstream_xchacha20poly1305_state),
354 "State must be a crypto_secretstream_xchacha20poly1305_state object",
355 raising=exc.TypeError,
356 )
357 lib.crypto_secretstream_xchacha20poly1305_rekey(state.statebuf)