Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/PyNaCl-1.6.0.dev1-py3.8-linux-x86_64.egg/nacl/bindings/crypto_secretstream.py: 29%

76 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-06 06:06 +0000

1# Copyright 2013-2018 Donald Stufft and individual contributors 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14from typing import ByteString, Optional, Tuple, cast 

15 

16from nacl import exceptions as exc 

17from nacl._sodium import ffi, lib 

18from nacl.exceptions import ensure 

19 

20 

21crypto_secretstream_xchacha20poly1305_ABYTES: int = ( 

22 lib.crypto_secretstream_xchacha20poly1305_abytes() 

23) 

24crypto_secretstream_xchacha20poly1305_HEADERBYTES: int = ( 

25 lib.crypto_secretstream_xchacha20poly1305_headerbytes() 

26) 

27crypto_secretstream_xchacha20poly1305_KEYBYTES: int = ( 

28 lib.crypto_secretstream_xchacha20poly1305_keybytes() 

29) 

30crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX: int = ( 

31 lib.crypto_secretstream_xchacha20poly1305_messagebytes_max() 

32) 

33crypto_secretstream_xchacha20poly1305_STATEBYTES: int = ( 

34 lib.crypto_secretstream_xchacha20poly1305_statebytes() 

35) 

36 

37 

38crypto_secretstream_xchacha20poly1305_TAG_MESSAGE: int = ( 

39 lib.crypto_secretstream_xchacha20poly1305_tag_message() 

40) 

41crypto_secretstream_xchacha20poly1305_TAG_PUSH: int = ( 

42 lib.crypto_secretstream_xchacha20poly1305_tag_push() 

43) 

44crypto_secretstream_xchacha20poly1305_TAG_REKEY: int = ( 

45 lib.crypto_secretstream_xchacha20poly1305_tag_rekey() 

46) 

47crypto_secretstream_xchacha20poly1305_TAG_FINAL: int = ( 

48 lib.crypto_secretstream_xchacha20poly1305_tag_final() 

49) 

50 

51 

52def crypto_secretstream_xchacha20poly1305_keygen() -> bytes: 

53 """ 

54 Generate a key for use with 

55 :func:`.crypto_secretstream_xchacha20poly1305_init_push`. 

56 

57 """ 

58 keybuf = ffi.new( 

59 "unsigned char[]", 

60 crypto_secretstream_xchacha20poly1305_KEYBYTES, 

61 ) 

62 lib.crypto_secretstream_xchacha20poly1305_keygen(keybuf) 

63 return ffi.buffer(keybuf)[:] 

64 

65 

66class crypto_secretstream_xchacha20poly1305_state: 

67 """ 

68 An object wrapping the crypto_secretstream_xchacha20poly1305 state. 

69 

70 """ 

71 

72 __slots__ = ["statebuf", "rawbuf", "tagbuf"] 

73 

74 def __init__(self) -> None: 

75 """Initialize a clean state object.""" 

76 self.statebuf: ByteString = ffi.new( 

77 "unsigned char[]", 

78 crypto_secretstream_xchacha20poly1305_STATEBYTES, 

79 ) 

80 

81 self.rawbuf: Optional[ByteString] = None 

82 self.tagbuf: Optional[ByteString] = None 

83 

84 

85def crypto_secretstream_xchacha20poly1305_init_push( 

86 state: crypto_secretstream_xchacha20poly1305_state, key: bytes 

87) -> bytes: 

88 """ 

89 Initialize a crypto_secretstream_xchacha20poly1305 encryption buffer. 

90 

91 :param state: a secretstream state object 

92 :type state: crypto_secretstream_xchacha20poly1305_state 

93 :param key: must be 

94 :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long 

95 :type key: bytes 

96 :return: header 

97 :rtype: bytes 

98 

99 """ 

100 ensure( 

101 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

102 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

103 raising=exc.TypeError, 

104 ) 

105 ensure( 

106 isinstance(key, bytes), 

107 "Key must be a bytes sequence", 

108 raising=exc.TypeError, 

109 ) 

110 ensure( 

111 len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES, 

112 "Invalid key length", 

113 raising=exc.ValueError, 

114 ) 

115 

116 headerbuf = ffi.new( 

117 "unsigned char []", 

118 crypto_secretstream_xchacha20poly1305_HEADERBYTES, 

119 ) 

120 

121 rc = lib.crypto_secretstream_xchacha20poly1305_init_push( 

122 state.statebuf, headerbuf, key 

123 ) 

124 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError) 

125 

126 return ffi.buffer(headerbuf)[:] 

127 

128 

129def crypto_secretstream_xchacha20poly1305_push( 

130 state: crypto_secretstream_xchacha20poly1305_state, 

131 m: bytes, 

132 ad: Optional[bytes] = None, 

133 tag: int = crypto_secretstream_xchacha20poly1305_TAG_MESSAGE, 

134) -> bytes: 

135 """ 

136 Add an encrypted message to the secret stream. 

137 

138 :param state: a secretstream state object 

139 :type state: crypto_secretstream_xchacha20poly1305_state 

140 :param m: the message to encrypt, the maximum length of an individual 

141 message is 

142 :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX`. 

143 :type m: bytes 

144 :param ad: additional data to include in the authentication tag 

145 :type ad: bytes or None 

146 :param tag: the message tag, usually 

147 :data:`.crypto_secretstream_xchacha20poly1305_TAG_MESSAGE` or 

148 :data:`.crypto_secretstream_xchacha20poly1305_TAG_FINAL`. 

149 :type tag: int 

150 :return: ciphertext 

151 :rtype: bytes 

152 

153 """ 

154 ensure( 

155 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

156 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

157 raising=exc.TypeError, 

158 ) 

159 ensure(isinstance(m, bytes), "Message is not bytes", raising=exc.TypeError) 

160 ensure( 

161 len(m) <= crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX, 

162 "Message is too long", 

163 raising=exc.ValueError, 

164 ) 

165 ensure( 

166 ad is None or isinstance(ad, bytes), 

167 "Additional data must be bytes or None", 

168 raising=exc.TypeError, 

169 ) 

170 

171 clen = len(m) + crypto_secretstream_xchacha20poly1305_ABYTES 

172 if state.rawbuf is None or len(state.rawbuf) < clen: 

173 state.rawbuf = ffi.new("unsigned char[]", clen) 

174 

175 if ad is None: 

176 ad = ffi.NULL 

177 adlen = 0 

178 else: 

179 adlen = len(ad) 

180 

181 rc = lib.crypto_secretstream_xchacha20poly1305_push( 

182 state.statebuf, 

183 state.rawbuf, 

184 ffi.NULL, 

185 m, 

186 len(m), 

187 ad, 

188 adlen, 

189 tag, 

190 ) 

191 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError) 

192 

193 return ffi.buffer(state.rawbuf, clen)[:] 

194 

195 

196def crypto_secretstream_xchacha20poly1305_init_pull( 

197 state: crypto_secretstream_xchacha20poly1305_state, 

198 header: bytes, 

199 key: bytes, 

200) -> None: 

201 """ 

202 Initialize a crypto_secretstream_xchacha20poly1305 decryption buffer. 

203 

204 :param state: a secretstream state object 

205 :type state: crypto_secretstream_xchacha20poly1305_state 

206 :param header: must be 

207 :data:`.crypto_secretstream_xchacha20poly1305_HEADERBYTES` long 

208 :type header: bytes 

209 :param key: must be 

210 :data:`.crypto_secretstream_xchacha20poly1305_KEYBYTES` long 

211 :type key: bytes 

212 

213 """ 

214 ensure( 

215 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

216 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

217 raising=exc.TypeError, 

218 ) 

219 ensure( 

220 isinstance(header, bytes), 

221 "Header must be a bytes sequence", 

222 raising=exc.TypeError, 

223 ) 

224 ensure( 

225 len(header) == crypto_secretstream_xchacha20poly1305_HEADERBYTES, 

226 "Invalid header length", 

227 raising=exc.ValueError, 

228 ) 

229 ensure( 

230 isinstance(key, bytes), 

231 "Key must be a bytes sequence", 

232 raising=exc.TypeError, 

233 ) 

234 ensure( 

235 len(key) == crypto_secretstream_xchacha20poly1305_KEYBYTES, 

236 "Invalid key length", 

237 raising=exc.ValueError, 

238 ) 

239 

240 if state.tagbuf is None: 

241 state.tagbuf = ffi.new("unsigned char *") 

242 

243 rc = lib.crypto_secretstream_xchacha20poly1305_init_pull( 

244 state.statebuf, header, key 

245 ) 

246 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError) 

247 

248 

249def crypto_secretstream_xchacha20poly1305_pull( 

250 state: crypto_secretstream_xchacha20poly1305_state, 

251 c: bytes, 

252 ad: Optional[bytes] = None, 

253) -> Tuple[bytes, int]: 

254 """ 

255 Read a decrypted message from the secret stream. 

256 

257 :param state: a secretstream state object 

258 :type state: crypto_secretstream_xchacha20poly1305_state 

259 :param c: the ciphertext to decrypt, the maximum length of an individual 

260 ciphertext is 

261 :data:`.crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX` + 

262 :data:`.crypto_secretstream_xchacha20poly1305_ABYTES`. 

263 :type c: bytes 

264 :param ad: additional data to include in the authentication tag 

265 :type ad: bytes or None 

266 :return: (message, tag) 

267 :rtype: (bytes, int) 

268 

269 """ 

270 ensure( 

271 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

272 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

273 raising=exc.TypeError, 

274 ) 

275 ensure( 

276 state.tagbuf is not None, 

277 ( 

278 "State must be initialized using " 

279 "crypto_secretstream_xchacha20poly1305_init_pull" 

280 ), 

281 raising=exc.ValueError, 

282 ) 

283 ensure( 

284 isinstance(c, bytes), 

285 "Ciphertext is not bytes", 

286 raising=exc.TypeError, 

287 ) 

288 ensure( 

289 len(c) >= crypto_secretstream_xchacha20poly1305_ABYTES, 

290 "Ciphertext is too short", 

291 raising=exc.ValueError, 

292 ) 

293 ensure( 

294 len(c) 

295 <= ( 

296 crypto_secretstream_xchacha20poly1305_MESSAGEBYTES_MAX 

297 + crypto_secretstream_xchacha20poly1305_ABYTES 

298 ), 

299 "Ciphertext is too long", 

300 raising=exc.ValueError, 

301 ) 

302 ensure( 

303 ad is None or isinstance(ad, bytes), 

304 "Additional data must be bytes or None", 

305 raising=exc.TypeError, 

306 ) 

307 

308 mlen = len(c) - crypto_secretstream_xchacha20poly1305_ABYTES 

309 if state.rawbuf is None or len(state.rawbuf) < mlen: 

310 state.rawbuf = ffi.new("unsigned char[]", mlen) 

311 

312 if ad is None: 

313 ad = ffi.NULL 

314 adlen = 0 

315 else: 

316 adlen = len(ad) 

317 

318 rc = lib.crypto_secretstream_xchacha20poly1305_pull( 

319 state.statebuf, 

320 state.rawbuf, 

321 ffi.NULL, 

322 state.tagbuf, 

323 c, 

324 len(c), 

325 ad, 

326 adlen, 

327 ) 

328 ensure(rc == 0, "Unexpected failure", raising=exc.RuntimeError) 

329 

330 # Cast safety: we `ensure` above that `state.tagbuf is not None`. 

331 return ( 

332 ffi.buffer(state.rawbuf, mlen)[:], 

333 int(cast(bytes, state.tagbuf)[0]), 

334 ) 

335 

336 

337def crypto_secretstream_xchacha20poly1305_rekey( 

338 state: crypto_secretstream_xchacha20poly1305_state, 

339) -> None: 

340 """ 

341 Explicitly change the encryption key in the stream. 

342 

343 Normally the stream is re-keyed as needed or an explicit ``tag`` of 

344 :data:`.crypto_secretstream_xchacha20poly1305_TAG_REKEY` is added to a 

345 message to ensure forward secrecy, but this method can be used instead 

346 if the re-keying is controlled without adding the tag. 

347 

348 :param state: a secretstream state object 

349 :type state: crypto_secretstream_xchacha20poly1305_state 

350 

351 """ 

352 ensure( 

353 isinstance(state, crypto_secretstream_xchacha20poly1305_state), 

354 "State must be a crypto_secretstream_xchacha20poly1305_state object", 

355 raising=exc.TypeError, 

356 ) 

357 lib.crypto_secretstream_xchacha20poly1305_rekey(state.statebuf)