Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PyNaCl-1.6.0.dev1-py3.11-linux-x86_64.egg/nacl/bindings/crypto_secretstream.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

78 statements  

1# Copyright 2013-2018 Donald Stufft and individual contributors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14from typing import Optional, Tuple, Union, cast 

15 

16from nacl import exceptions as exc 

17from nacl._sodium import ffi, lib 

18from nacl.exceptions import ensure 

19 

20 

21crypto_secretstream_xchacha20poly1305_ABYTES: int = ( 

22 lib.crypto_secretstream_xchacha20poly1305_abytes() 

23) 

24crypto_secretstream_xchacha20poly1305_HEADERBYTES: int = ( 

25 lib.crypto_secretstream_xchacha20poly1305_headerbytes() 

26) 

27crypto_secretstream_xchacha20poly1305_KEYBYTES: int = ( 

28 lib.crypto_secretstream_xchacha20poly1305_keybytes() 

29) 

30crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX: int = ( 

31 lib.crypto_secretstream_xchacha20poly1305_messagebytes_max() 

32) 

33crypto_secretstream_xchacha20poly1305_STATEBYTES: int = ( 

34 lib.crypto_secretstream_xchacha20poly1305_statebytes() 

35) 

36 

37 

38crypto_secretstream_xchacha20poly1305_TAG_MESSAGE: int = ( 

39 lib.crypto_secretstream_xchacha20poly1305_tag_message() 

40) 

41crypto_secretstream_xchacha20poly1305_TAG_PUSH: int = ( 

42 lib.crypto_secretstream_xchacha20poly1305_tag_push() 

43) 

44crypto_secretstream_xchacha20poly1305_TAG_REKEY: int = ( 

45 lib.crypto_secretstream_xchacha20poly1305_tag_rekey() 

46) 

47crypto_secretstream_xchacha20poly1305_TAG_FINAL: int = ( 

48 lib.crypto_secretstream_xchacha20poly1305_tag_final() 

49) 

50 

51 

52def crypto_secretstream_xchacha20poly1305_keygen() -> bytes: 

53 """ 

54 Generate a key for use with 

55 :func:`.crypto_secretstream_xchacha20poly1305_init_push`. 

56 

57 """ 

58 keybuf = ffi.new( 

59 "unsigned char[]", 

60 crypto_secretstream_xchacha20poly1305_KEYBYTES, 

61 ) 

62 lib.crypto_secretstream_xchacha20poly1305_keygen(keybuf) 

63 return ffi.buffer(keybuf)[:] 

64 

65 

66class crypto_secretstream_xchacha20poly1305_state: 

67 """ 

68 An object wrapping the crypto_secretstream_xchacha20poly1305 state. 

69 

70 """ 

71 

72 __slots__ = ["statebuf", "rawbuf", "tagbuf"] 

73 

74 def __init__(self) -> None: 

75 """Initialize a clean state object.""" 

76 ByteString = Union[bytes, bytearray, memoryview] 

77 self.statebuf: ByteString = ffi.new( 

78 "unsigned char[]", 

79 crypto_secretstream_xchacha20poly1305_STATEBYTES, 

80 ) 

81 

82 self.rawbuf: Optional[ByteString] = None 

83 self.tagbuf: Optional[ByteString] = None 

84 

85 

86def crypto_secretstream_xchacha20poly1305_init_push( 

87 state: crypto_secretstream_xchacha20poly1305_state, key: bytes 

88) -> bytes: 

89 """ 

90 Initialize a crypto_secretstream_xchacha20poly1305 encryption buffer. 

91 

92 :param state: a secretstream state object 

93 :type state: crypto_secretstream_xchacha20poly1305_state 

94 :param key: must be 

95 :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long 

96 :type key: bytes 

97 :return: header 

98 :rtype: bytes 

99 

100 """ 

101 ensure( 

102 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

103 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

104 raising=exc.TypeError, 

105 ) 

106 ensure( 

107 isinstance(key, bytes), 

108 "Key must be a bytes sequence", 

109 raising=exc.TypeError, 

110 ) 

111 ensure( 

112 len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES, 

113 "Invalid key length", 

114 raising=exc.ValueError, 

115 ) 

116 

117 headerbuf = ffi.new( 

118 "unsigned char []", 

119 crypto_secretstream_xchacha20poly1305_HEADERBYTES, 

120 ) 

121 

122 rc = lib.crypto_secretstream_xchacha20poly1305_init_push( 

123 state.statebuf, headerbuf, key 

124 ) 

125 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError) 

126 

127 return ffi.buffer(headerbuf)[:] 

128 

129 

130def crypto_secretstream_xchacha20poly1305_push( 

131 state: crypto_secretstream_xchacha20poly1305_state, 

132 m: bytes, 

133 ad: Optional[bytes] = None, 

134 tag: int = crypto_secretstream_xchacha20poly1305_TAG_MESSAGE, 

135) -> bytes: 

136 """ 

137 Add an encrypted message to the secret stream. 

138 

139 :param state: a secretstream state object 

140 :type state: crypto_secretstream_xchacha20poly1305_state 

141 :param m: the message to encrypt, the maximum length of an individual 

142 message is 

143 :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX`. 

144 :type m: bytes 

145 :param ad: additional data to include in the authentication tag 

146 :type ad: bytes or None 

147 :param tag: the message tag, usually 

148 :data:`.crypto_secretstream_xchacha20poly1305_TAG_MESSAGE` or 

149 :data:`.crypto_secretstream_xchacha20poly1305_TAG_FINAL`. 

150 :type tag: int 

151 :return: ciphertext 

152 :rtype: bytes 

153 

154 """ 

155 ensure( 

156 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

157 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

158 raising=exc.TypeError, 

159 ) 

160 ensure(isinstance(m, bytes), "Message is not bytes", raising=exc.TypeError) 

161 ensure( 

162 len(m) <= crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX, 

163 "Message is too long", 

164 raising=exc.ValueError, 

165 ) 

166 ensure( 

167 ad is None or isinstance(ad, bytes), 

168 "Additional data must be bytes or None", 

169 raising=exc.TypeError, 

170 ) 

171 

172 clen = len(m) + crypto_secretstream_xchacha20poly1305_ABYTES 

173 if state.rawbuf is None or len(state.rawbuf) < clen: 

174 state.rawbuf = ffi.new("unsigned char[]", clen) 

175 

176 if ad is None: 

177 ad = ffi.NULL 

178 adlen = 0 

179 else: 

180 adlen = len(ad) 

181 

182 rc = lib.crypto_secretstream_xchacha20poly1305_push( 

183 state.statebuf, 

184 state.rawbuf, 

185 ffi.NULL, 

186 m, 

187 len(m), 

188 ad, 

189 adlen, 

190 tag, 

191 ) 

192 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError) 

193 

194 return ffi.buffer(state.rawbuf, clen)[:] 

195 

196 

197def crypto_secretstream_xchacha20poly1305_init_pull( 

198 state: crypto_secretstream_xchacha20poly1305_state, 

199 header: bytes, 

200 key: bytes, 

201) -> None: 

202 """ 

203 Initialize a crypto_secretstream_xchacha20poly1305 decryption buffer. 

204 

205 :param state: a secretstream state object 

206 :type state: crypto_secretstream_xchacha20poly1305_state 

207 :param header: must be 

208 :data:`.crypto_secretstream_xchacha20poly1305_HEADERBYTES` long 

209 :type header: bytes 

210 :param key: must be 

211 :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long 

212 :type key: bytes 

213 

214 """ 

215 ensure( 

216 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

217 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

218 raising=exc.TypeError, 

219 ) 

220 ensure( 

221 isinstance(header, bytes), 

222 "Header must be a bytes sequence", 

223 raising=exc.TypeError, 

224 ) 

225 ensure( 

226 len(header) == crypto_secretstream_xchacha20poly1305_HEADERBYTES, 

227 "Invalid header length", 

228 raising=exc.ValueError, 

229 ) 

230 ensure( 

231 isinstance(key, bytes), 

232 "Key must be a bytes sequence", 

233 raising=exc.TypeError, 

234 ) 

235 ensure( 

236 len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES, 

237 "Invalid key length", 

238 raising=exc.ValueError, 

239 ) 

240 

241 if state.tagbuf is None: 

242 state.tagbuf = ffi.new("unsigned char *") 

243 

244 rc = lib.crypto_secretstream_xchacha20poly1305_init_pull( 

245 state.statebuf, header, key 

246 ) 

247 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError) 

248 

249 

250def crypto_secretstream_xchacha20poly1305_pull( 

251 state: crypto_secretstream_xchacha20poly1305_state, 

252 c: bytes, 

253 ad: Optional[bytes] = None, 

254) -> Tuple[bytes, int]: 

255 """ 

256 Read a decrypted message from the secret stream. 

257 

258 :param state: a secretstream state object 

259 :type state: crypto_secretstream_xchacha20poly1305_state 

260 :param c: the ciphertext to decrypt, the maximum length of an individual 

261 ciphertext is 

262 :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX` + 

263 :data:`.crypto_secretstream_xchacha20poly1305_ABYTES`. 

264 :type c: bytes 

265 :param ad: additional data to include in the authentication tag 

266 :type ad: bytes or None 

267 :return: (message, tag) 

268 :rtype: (bytes, int) 

269 

270 """ 

271 ensure( 

272 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

273 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

274 raising=exc.TypeError, 

275 ) 

276 ensure( 

277 state.tagbuf is not None, 

278 ( 

279 "State must be initialized using " 

280 "crypto_secretstream_xchacha20poly1305_init_pull" 

281 ), 

282 raising=exc.ValueError, 

283 ) 

284 ensure( 

285 isinstance(c, bytes), 

286 "Ciphertext is not bytes", 

287 raising=exc.TypeError, 

288 ) 

289 ensure( 

290 len(c) >= crypto_secretstream_xchacha20poly1305_ABYTES, 

291 "Ciphertext is too short", 

292 raising=exc.ValueError, 

293 ) 

294 ensure( 

295 len(c) 

296 <= ( 

297 crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX 

298 + crypto_secretstream_xchacha20poly1305_ABYTES 

299 ), 

300 "Ciphertext is too long", 

301 raising=exc.ValueError, 

302 ) 

303 ensure( 

304 ad is None or isinstance(ad, bytes), 

305 "Additional data must be bytes or None", 

306 raising=exc.TypeError, 

307 ) 

308 

309 mlen = len(c) - crypto_secretstream_xchacha20poly1305_ABYTES 

310 if state.rawbuf is None or len(state.rawbuf) < mlen: 

311 state.rawbuf = ffi.new("unsigned char[]", mlen) 

312 

313 if ad is None: 

314 ad = ffi.NULL 

315 adlen = 0 

316 else: 

317 adlen = len(ad) 

318 

319 rc = lib.crypto_secretstream_xchacha20poly1305_pull( 

320 state.statebuf, 

321 state.rawbuf, 

322 ffi.NULL, 

323 state.tagbuf, 

324 c, 

325 len(c), 

326 ad, 

327 adlen, 

328 ) 

329 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError) 

330 

331 # Cast safety: we `ensure` above that `state.tagbuf is not None`. 

332 return ( 

333 ffi.buffer(state.rawbuf, mlen)[:], 

334 int(cast(bytes, state.tagbuf)[0]), 

335 ) 

336 

337 

338def crypto_secretstream_xchacha20poly1305_rekey( 

339 state: crypto_secretstream_xchacha20poly1305_state, 

340) -> None: 

341 """ 

342 Explicitly change the encryption key in the stream. 

343 

344 Normally the stream is re-keyed as needed or an explicit ``tag`` of 

345 :data:`.crypto_secretstream_xchacha20poly1305_TAG_REKEY` is added to a 

346 message to ensure forward secrecy, but this method can be used instead 

347 if the re-keying is controlled without adding the tag. 

348 

349 :param state: a secretstream state object 

350 :type state: crypto_secretstream_xchacha20poly1305_state 

351 

352 """ 

353 ensure( 

354 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

355 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

356 raising=exc.TypeError, 

357 ) 

358 lib.crypto_secretstream_xchacha20poly1305_rekey(state.statebuf)