Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/hazmat/primitives/ciphers/base.py: 43%

138 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:05 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5from __future__ import annotations 

6 

7import abc 

8import typing 

9 

10from cryptography.exceptions import ( 

11 AlreadyFinalized, 

12 AlreadyUpdated, 

13 NotYetFinalized, 

14) 

15from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm 

16from cryptography.hazmat.primitives.ciphers import modes 

17 

18if typing.TYPE_CHECKING: 

19 from cryptography.hazmat.backends.openssl.ciphers import ( 

20 _CipherContext as _BackendCipherContext, 

21 ) 

22 

23 

24class CipherContext(metaclass=abc.ABCMeta): 

25 @abc.abstractmethod 

26 def update(self, data: bytes) -> bytes: 

27 """ 

28 Processes the provided bytes through the cipher and returns the results 

29 as bytes. 

30 """ 

31 

32 @abc.abstractmethod 

33 def update_into(self, data: bytes, buf: bytes) -> int: 

34 """ 

35 Processes the provided bytes and writes the resulting data into the 

36 provided buffer. Returns the number of bytes written. 

37 """ 

38 

39 @abc.abstractmethod 

40 def finalize(self) -> bytes: 

41 """ 

42 Returns the results of processing the final block as bytes. 

43 """ 

44 

45 

46class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta): 

47 @abc.abstractmethod 

48 def authenticate_additional_data(self, data: bytes) -> None: 

49 """ 

50 Authenticates the provided bytes. 

51 """ 

52 

53 

54class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 

55 @abc.abstractmethod 

56 def finalize_with_tag(self, tag: bytes) -> bytes: 

57 """ 

58 Returns the results of processing the final block as bytes and allows 

59 delayed passing of the authentication tag. 

60 """ 

61 

62 

63class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): 

64 @property 

65 @abc.abstractmethod 

66 def tag(self) -> bytes: 

67 """ 

68 Returns tag bytes. This is only available after encryption is 

69 finalized. 

70 """ 

71 

72 

73Mode = typing.TypeVar( 

74 "Mode", bound=typing.Optional[modes.Mode], covariant=True 

75) 

76 

77 

78class Cipher(typing.Generic[Mode]): 

79 def __init__( 

80 self, 

81 algorithm: CipherAlgorithm, 

82 mode: Mode, 

83 backend: typing.Any = None, 

84 ) -> None: 

85 if not isinstance(algorithm, CipherAlgorithm): 

86 raise TypeError("Expected interface of CipherAlgorithm.") 

87 

88 if mode is not None: 

89 # mypy needs this assert to narrow the type from our generic 

90 # type. Maybe it won't some time in the future. 

91 assert isinstance(mode, modes.Mode) 

92 mode.validate_for_algorithm(algorithm) 

93 

94 self.algorithm = algorithm 

95 self.mode = mode 

96 

97 @typing.overload 

98 def encryptor( 

99 self: Cipher[modes.ModeWithAuthenticationTag], 

100 ) -> AEADEncryptionContext: 

101 ... 

102 

103 @typing.overload 

104 def encryptor( 

105 self: _CIPHER_TYPE, 

106 ) -> CipherContext: 

107 ... 

108 

109 def encryptor(self): 

110 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

111 if self.mode.tag is not None: 

112 raise ValueError( 

113 "Authentication tag must be None when encrypting." 

114 ) 

115 from cryptography.hazmat.backends.openssl.backend import backend 

116 

117 ctx = backend.create_symmetric_encryption_ctx( 

118 self.algorithm, self.mode 

119 ) 

120 return self._wrap_ctx(ctx, encrypt=True) 

121 

122 @typing.overload 

123 def decryptor( 

124 self: Cipher[modes.ModeWithAuthenticationTag], 

125 ) -> AEADDecryptionContext: 

126 ... 

127 

128 @typing.overload 

129 def decryptor( 

130 self: _CIPHER_TYPE, 

131 ) -> CipherContext: 

132 ... 

133 

134 def decryptor(self): 

135 from cryptography.hazmat.backends.openssl.backend import backend 

136 

137 ctx = backend.create_symmetric_decryption_ctx( 

138 self.algorithm, self.mode 

139 ) 

140 return self._wrap_ctx(ctx, encrypt=False) 

141 

142 def _wrap_ctx( 

143 self, ctx: _BackendCipherContext, encrypt: bool 

144 ) -> typing.Union[ 

145 AEADEncryptionContext, AEADDecryptionContext, CipherContext 

146 ]: 

147 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 

148 if encrypt: 

149 return _AEADEncryptionContext(ctx) 

150 else: 

151 return _AEADDecryptionContext(ctx) 

152 else: 

153 return _CipherContext(ctx) 

154 

155 

156_CIPHER_TYPE = Cipher[ 

157 typing.Union[ 

158 modes.ModeWithNonce, 

159 modes.ModeWithTweak, 

160 None, 

161 modes.ECB, 

162 modes.ModeWithInitializationVector, 

163 ] 

164] 

165 

166 

167class _CipherContext(CipherContext): 

168 _ctx: typing.Optional[_BackendCipherContext] 

169 

170 def __init__(self, ctx: _BackendCipherContext) -> None: 

171 self._ctx = ctx 

172 

173 def update(self, data: bytes) -> bytes: 

174 if self._ctx is None: 

175 raise AlreadyFinalized("Context was already finalized.") 

176 return self._ctx.update(data) 

177 

178 def update_into(self, data: bytes, buf: bytes) -> int: 

179 if self._ctx is None: 

180 raise AlreadyFinalized("Context was already finalized.") 

181 return self._ctx.update_into(data, buf) 

182 

183 def finalize(self) -> bytes: 

184 if self._ctx is None: 

185 raise AlreadyFinalized("Context was already finalized.") 

186 data = self._ctx.finalize() 

187 self._ctx = None 

188 return data 

189 

190 

191class _AEADCipherContext(AEADCipherContext): 

192 _ctx: typing.Optional[_BackendCipherContext] 

193 _tag: typing.Optional[bytes] 

194 

195 def __init__(self, ctx: _BackendCipherContext) -> None: 

196 self._ctx = ctx 

197 self._bytes_processed = 0 

198 self._aad_bytes_processed = 0 

199 self._tag = None 

200 self._updated = False 

201 

202 def _check_limit(self, data_size: int) -> None: 

203 if self._ctx is None: 

204 raise AlreadyFinalized("Context was already finalized.") 

205 self._updated = True 

206 self._bytes_processed += data_size 

207 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES: 

208 raise ValueError( 

209 "{} has a maximum encrypted byte limit of {}".format( 

210 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES 

211 ) 

212 ) 

213 

214 def update(self, data: bytes) -> bytes: 

215 self._check_limit(len(data)) 

216 # mypy needs this assert even though _check_limit already checked 

217 assert self._ctx is not None 

218 return self._ctx.update(data) 

219 

220 def update_into(self, data: bytes, buf: bytes) -> int: 

221 self._check_limit(len(data)) 

222 # mypy needs this assert even though _check_limit already checked 

223 assert self._ctx is not None 

224 return self._ctx.update_into(data, buf) 

225 

226 def finalize(self) -> bytes: 

227 if self._ctx is None: 

228 raise AlreadyFinalized("Context was already finalized.") 

229 data = self._ctx.finalize() 

230 self._tag = self._ctx.tag 

231 self._ctx = None 

232 return data 

233 

234 def authenticate_additional_data(self, data: bytes) -> None: 

235 if self._ctx is None: 

236 raise AlreadyFinalized("Context was already finalized.") 

237 if self._updated: 

238 raise AlreadyUpdated("Update has been called on this context.") 

239 

240 self._aad_bytes_processed += len(data) 

241 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES: 

242 raise ValueError( 

243 "{} has a maximum AAD byte limit of {}".format( 

244 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES 

245 ) 

246 ) 

247 

248 self._ctx.authenticate_additional_data(data) 

249 

250 

251class _AEADDecryptionContext(_AEADCipherContext, AEADDecryptionContext): 

252 def finalize_with_tag(self, tag: bytes) -> bytes: 

253 if self._ctx is None: 

254 raise AlreadyFinalized("Context was already finalized.") 

255 data = self._ctx.finalize_with_tag(tag) 

256 self._tag = self._ctx.tag 

257 self._ctx = None 

258 return data 

259 

260 

261class _AEADEncryptionContext(_AEADCipherContext, AEADEncryptionContext): 

262 @property 

263 def tag(self) -> bytes: 

264 if self._ctx is not None: 

265 raise NotYetFinalized( 

266 "You must finalize encryption before " "getting the tag." 

267 ) 

268 assert self._tag is not None 

269 return self._tag