Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/fernet.py: 26%

136 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# This file is dual licensed under the terms of the Apache License, Version 

2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 

3# for complete details. 

4 

5 

6import base64 

7import binascii 

8import os 

9import time 

10import typing 

11 

12from cryptography import utils 

13from cryptography.exceptions import InvalidSignature 

14from cryptography.hazmat.primitives import hashes, padding 

15from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

16from cryptography.hazmat.primitives.hmac import HMAC 

17 

18 

19class InvalidToken(Exception): 

20 pass 

21 

22 

23_MAX_CLOCK_SKEW = 60 

24 

25 

26class Fernet: 

27 def __init__( 

28 self, 

29 key: typing.Union[bytes, str], 

30 backend: typing.Any = None, 

31 ): 

32 try: 

33 key = base64.urlsafe_b64decode(key) 

34 except binascii.Error as exc: 

35 raise ValueError( 

36 "Fernet key must be 32 url-safe base64-encoded bytes." 

37 ) from exc 

38 if len(key) != 32: 

39 raise ValueError( 

40 "Fernet key must be 32 url-safe base64-encoded bytes." 

41 ) 

42 

43 self._signing_key = key[:16] 

44 self._encryption_key = key[16:] 

45 

46 @classmethod 

47 def generate_key(cls) -> bytes: 

48 return base64.urlsafe_b64encode(os.urandom(32)) 

49 

50 def encrypt(self, data: bytes) -> bytes: 

51 return self.encrypt_at_time(data, int(time.time())) 

52 

53 def encrypt_at_time(self, data: bytes, current_time: int) -> bytes: 

54 iv = os.urandom(16) 

55 return self._encrypt_from_parts(data, current_time, iv) 

56 

57 def _encrypt_from_parts( 

58 self, data: bytes, current_time: int, iv: bytes 

59 ) -> bytes: 

60 utils._check_bytes("data", data) 

61 

62 padder = padding.PKCS7(algorithms.AES.block_size).padder() 

63 padded_data = padder.update(data) + padder.finalize() 

64 encryptor = Cipher( 

65 algorithms.AES(self._encryption_key), 

66 modes.CBC(iv), 

67 ).encryptor() 

68 ciphertext = encryptor.update(padded_data) + encryptor.finalize() 

69 

70 basic_parts = ( 

71 b"\x80" 

72 + current_time.to_bytes(length=8, byteorder="big") 

73 + iv 

74 + ciphertext 

75 ) 

76 

77 h = HMAC(self._signing_key, hashes.SHA256()) 

78 h.update(basic_parts) 

79 hmac = h.finalize() 

80 return base64.urlsafe_b64encode(basic_parts + hmac) 

81 

82 def decrypt( 

83 self, token: typing.Union[bytes, str], ttl: typing.Optional[int] = None 

84 ) -> bytes: 

85 timestamp, data = Fernet._get_unverified_token_data(token) 

86 if ttl is None: 

87 time_info = None 

88 else: 

89 time_info = (ttl, int(time.time())) 

90 return self._decrypt_data(data, timestamp, time_info) 

91 

92 def decrypt_at_time( 

93 self, token: typing.Union[bytes, str], ttl: int, current_time: int 

94 ) -> bytes: 

95 if ttl is None: 

96 raise ValueError( 

97 "decrypt_at_time() can only be used with a non-None ttl" 

98 ) 

99 timestamp, data = Fernet._get_unverified_token_data(token) 

100 return self._decrypt_data(data, timestamp, (ttl, current_time)) 

101 

102 def extract_timestamp(self, token: typing.Union[bytes, str]) -> int: 

103 timestamp, data = Fernet._get_unverified_token_data(token) 

104 # Verify the token was not tampered with. 

105 self._verify_signature(data) 

106 return timestamp 

107 

108 @staticmethod 

109 def _get_unverified_token_data( 

110 token: typing.Union[bytes, str] 

111 ) -> typing.Tuple[int, bytes]: 

112 if not isinstance(token, (str, bytes)): 

113 raise TypeError("token must be bytes or str") 

114 

115 try: 

116 data = base64.urlsafe_b64decode(token) 

117 except (TypeError, binascii.Error): 

118 raise InvalidToken 

119 

120 if not data or data[0] != 0x80: 

121 raise InvalidToken 

122 

123 if len(data) < 9: 

124 raise InvalidToken 

125 

126 timestamp = int.from_bytes(data[1:9], byteorder="big") 

127 return timestamp, data 

128 

129 def _verify_signature(self, data: bytes) -> None: 

130 h = HMAC(self._signing_key, hashes.SHA256()) 

131 h.update(data[:-32]) 

132 try: 

133 h.verify(data[-32:]) 

134 except InvalidSignature: 

135 raise InvalidToken 

136 

137 def _decrypt_data( 

138 self, 

139 data: bytes, 

140 timestamp: int, 

141 time_info: typing.Optional[typing.Tuple[int, int]], 

142 ) -> bytes: 

143 if time_info is not None: 

144 ttl, current_time = time_info 

145 if timestamp + ttl < current_time: 

146 raise InvalidToken 

147 

148 if current_time + _MAX_CLOCK_SKEW < timestamp: 

149 raise InvalidToken 

150 

151 self._verify_signature(data) 

152 

153 iv = data[9:25] 

154 ciphertext = data[25:-32] 

155 decryptor = Cipher( 

156 algorithms.AES(self._encryption_key), modes.CBC(iv) 

157 ).decryptor() 

158 plaintext_padded = decryptor.update(ciphertext) 

159 try: 

160 plaintext_padded += decryptor.finalize() 

161 except ValueError: 

162 raise InvalidToken 

163 unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() 

164 

165 unpadded = unpadder.update(plaintext_padded) 

166 try: 

167 unpadded += unpadder.finalize() 

168 except ValueError: 

169 raise InvalidToken 

170 return unpadded 

171 

172 

173class MultiFernet: 

174 def __init__(self, fernets: typing.Iterable[Fernet]): 

175 fernets = list(fernets) 

176 if not fernets: 

177 raise ValueError( 

178 "MultiFernet requires at least one Fernet instance" 

179 ) 

180 self._fernets = fernets 

181 

182 def encrypt(self, msg: bytes) -> bytes: 

183 return self.encrypt_at_time(msg, int(time.time())) 

184 

185 def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes: 

186 return self._fernets[0].encrypt_at_time(msg, current_time) 

187 

188 def rotate(self, msg: typing.Union[bytes, str]) -> bytes: 

189 timestamp, data = Fernet._get_unverified_token_data(msg) 

190 for f in self._fernets: 

191 try: 

192 p = f._decrypt_data(data, timestamp, None) 

193 break 

194 except InvalidToken: 

195 pass 

196 else: 

197 raise InvalidToken 

198 

199 iv = os.urandom(16) 

200 return self._fernets[0]._encrypt_from_parts(p, timestamp, iv) 

201 

202 def decrypt( 

203 self, msg: typing.Union[bytes, str], ttl: typing.Optional[int] = None 

204 ) -> bytes: 

205 for f in self._fernets: 

206 try: 

207 return f.decrypt(msg, ttl) 

208 except InvalidToken: 

209 pass 

210 raise InvalidToken 

211 

212 def decrypt_at_time( 

213 self, msg: typing.Union[bytes, str], ttl: int, current_time: int 

214 ) -> bytes: 

215 for f in self._fernets: 

216 try: 

217 return f.decrypt_at_time(msg, ttl, current_time) 

218 except InvalidToken: 

219 pass 

220 raise InvalidToken