Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/primitives/ciphers/base.py: 42%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

140 statements  

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import abc 

8import typing 

9 

10from cryptography.exceptions import ( 

11 AlreadyFinalized, 

12 AlreadyUpdated, 

13 NotYetFinalized, 

14) 

15from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm 

16from cryptography.hazmat.primitives.ciphers import modes 

17 

18if typing.TYPE_CHECKING: 

19 from cryptography.hazmat.backends.openssl.ciphers import ( 

20 _CipherContext as _BackendCipherContext, 

21 ) 

22 

23 

24class CipherContext(metaclass=abc.ABCMeta): 

25 @abc.abstractmethod 

26 def update(self, data: bytes) -> bytes: 

27 """ 

28 Processes the provided bytes through the cipher and returns the results 

29 as bytes. 

30 """ 

31 

32 @abc.abstractmethod 

33 def update_into(self, data: bytes, buf: bytes) -> int: 

34 """ 

35 Processes the provided bytes and writes the resulting data into the 

36 provided buffer. Returns the number of bytes written. 

37 """ 

38 

39 @abc.abstractmethod 

40 def finalize(self) -> bytes: 

41 """ 

42 Returns the results of processing the final block as bytes. 

43 """ 

44 

45 

46class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta): 

47 @abc.abstractmethod 

48 def authenticate_additional_data(self, data: bytes) -> None: 

49 """ 

50 Authenticates the provided bytes. 

51 """ 

52 

53 

54class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 

55 @abc.abstractmethod 

56 def finalize_with_tag(self, tag: bytes) -> bytes: 

57 """ 

58 Returns the results of processing the final block as bytes and allows 

59 delayed passing of the authentication tag. 

60 """ 

61 

62 

63class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 

64 @property 

65 @abc.abstractmethod 

66 def tag(self) -> bytes: 

67 """ 

68 Returns tag bytes. This is only available after encryption is 

69 finalized. 

70 """ 

71 

72 

73Mode = typing.TypeVar( 

74 "Mode", bound=typing.Optional[modes.Mode], covariant=True 

75) 

76 

77 

78class Cipher(typing.Generic[Mode]): 

79 def __init__( 

80 self, 

81 algorithm: CipherAlgorithm, 

82 mode: Mode, 

83 backend: typing.Any = None, 

84 ) -> None: 

85 if not isinstance(algorithm, CipherAlgorithm): 

86 raise TypeError("Expected interface of CipherAlgorithm.") 

87 

88 if mode is not None: 

89 # mypy needs this assert to narrow the type from our generic 

90 # type. Maybe it won't some time in the future. 

91 assert isinstance(mode, modes.Mode) 

92 mode.validate_for_algorithm(algorithm) 

93 

94 self.algorithm = algorithm 

95 self.mode = mode 

96 

97 @typing.overload 

98 def encryptor( 

99 self: Cipher[modes.ModeWithAuthenticationTag], 

100 ) -> AEADEncryptionContext: 

101 ... 

102 

103 @typing.overload 

104 def encryptor( 

105 self: _CIPHER_TYPE, 

106 ) -> CipherContext: 

107 ... 

108 

109 def encryptor(self): 

110 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

111 if self.mode.tag is not None: 

112 raise ValueError( 

113 "Authentication tag must be None when encrypting." 

114 ) 

115 from cryptography.hazmat.backends.openssl.backend import backend 

116 

117 ctx = backend.create_symmetric_encryption_ctx( 

118 self.algorithm, self.mode 

119 ) 

120 return self._wrap_ctx(ctx, encrypt=True) 

121 

122 @typing.overload 

123 def decryptor( 

124 self: Cipher[modes.ModeWithAuthenticationTag], 

125 ) -> AEADDecryptionContext: 

126 ... 

127 

128 @typing.overload 

129 def decryptor( 

130 self: _CIPHER_TYPE, 

131 ) -> CipherContext: 

132 ... 

133 

134 def decryptor(self): 

135 from cryptography.hazmat.backends.openssl.backend import backend 

136 

137 ctx = backend.create_symmetric_decryption_ctx( 

138 self.algorithm, self.mode 

139 ) 

140 return self._wrap_ctx(ctx, encrypt=False) 

141 

142 def _wrap_ctx( 

143 self, ctx: _BackendCipherContext, encrypt: bool 

144 ) -> AEADEncryptionContext | AEADDecryptionContext | CipherContext: 

145 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

146 if encrypt: 

147 return _AEADEncryptionContext(ctx) 

148 else: 

149 return _AEADDecryptionContext(ctx) 

150 else: 

151 return _CipherContext(ctx) 

152 

153 

154_CIPHER_TYPE = Cipher[ 

155 typing.Union[ 

156 modes.ModeWithNonce, 

157 modes.ModeWithTweak, 

158 None, 

159 modes.ECB, 

160 modes.ModeWithInitializationVector, 

161 ] 

162] 

163 

164 

165class _CipherContext(CipherContext): 

166 _ctx: _BackendCipherContext | None 

167 

168 def __init__(self, ctx: _BackendCipherContext) -> None: 

169 self._ctx = ctx 

170 

171 def update(self, data: bytes) -> bytes: 

172 if self._ctx is None: 

173 raise AlreadyFinalized("Context was already finalized.") 

174 return self._ctx.update(data) 

175 

176 def update_into(self, data: bytes, buf: bytes) -> int: 

177 if self._ctx is None: 

178 raise AlreadyFinalized("Context was already finalized.") 

179 return self._ctx.update_into(data, buf) 

180 

181 def finalize(self) -> bytes: 

182 if self._ctx is None: 

183 raise AlreadyFinalized("Context was already finalized.") 

184 data = self._ctx.finalize() 

185 self._ctx = None 

186 return data 

187 

188 

189class _AEADCipherContext(AEADCipherContext): 

190 _ctx: _BackendCipherContext | None 

191 _tag: bytes | None 

192 

193 def __init__(self, ctx: _BackendCipherContext) -> None: 

194 self._ctx = ctx 

195 self._bytes_processed = 0 

196 self._aad_bytes_processed = 0 

197 self._tag = None 

198 self._updated = False 

199 

200 def _check_limit(self, data_size: int) -> None: 

201 if self._ctx is None: 

202 raise AlreadyFinalized("Context was already finalized.") 

203 self._updated = True 

204 self._bytes_processed += data_size 

205 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES: 

206 raise ValueError( 

207 "{} has a maximum encrypted byte limit of {}".format( 

208 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES 

209 ) 

210 ) 

211 

212 def update(self, data: bytes) -> bytes: 

213 self._check_limit(len(data)) 

214 # mypy needs this assert even though _check_limit already checked 

215 assert self._ctx is not None 

216 return self._ctx.update(data) 

217 

218 def update_into(self, data: bytes, buf: bytes) -> int: 

219 self._check_limit(len(data)) 

220 # mypy needs this assert even though _check_limit already checked 

221 assert self._ctx is not None 

222 return self._ctx.update_into(data, buf) 

223 

224 def finalize(self) -> bytes: 

225 if self._ctx is None: 

226 raise AlreadyFinalized("Context was already finalized.") 

227 data = self._ctx.finalize() 

228 self._tag = self._ctx.tag 

229 self._ctx = None 

230 return data 

231 

232 def authenticate_additional_data(self, data: bytes) -> None: 

233 if self._ctx is None: 

234 raise AlreadyFinalized("Context was already finalized.") 

235 if self._updated: 

236 raise AlreadyUpdated("Update has been called on this context.") 

237 

238 self._aad_bytes_processed += len(data) 

239 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES: 

240 raise ValueError( 

241 "{} has a maximum AAD byte limit of {}".format( 

242 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES 

243 ) 

244 ) 

245 

246 self._ctx.authenticate_additional_data(data) 

247 

248 

249class _AEADDecryptionContext(_AEADCipherContext, AEADDecryptionContext): 

250 def finalize_with_tag(self, tag: bytes) -> bytes: 

251 if self._ctx is None: 

252 raise AlreadyFinalized("Context was already finalized.") 

253 if self._ctx._tag is not None: 

254 raise ValueError( 

255 "tag provided both in mode and in call with finalize_with_tag:" 

256 " tag should only be provided once" 

257 ) 

258 data = self._ctx.finalize_with_tag(tag) 

259 self._tag = self._ctx.tag 

260 self._ctx = None 

261 return data 

262 

263 

264class _AEADEncryptionContext(_AEADCipherContext, AEADEncryptionContext): 

265 @property 

266 def tag(self) -> bytes: 

267 if self._ctx is not None: 

268 raise NotYetFinalized( 

269 "You must finalize encryption before " "getting the tag." 

270 ) 

271 assert self._tag is not None 

272 return self._tag