Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/cryptography/fernet.py: 26%
136 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
6import base64
7import binascii
8import os
9import time
10import typing
12from cryptography import utils
13from cryptography.exceptions import InvalidSignature
14from cryptography.hazmat.primitives import hashes, padding
15from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
16from cryptography.hazmat.primitives.hmac import HMAC
19class InvalidToken(Exception):
20 pass
23_MAX_CLOCK_SKEW = 60
26class Fernet:
27 def __init__(
28 self,
29 key: typing.Union[bytes, str],
30 backend: typing.Any = None,
31 ):
32 try:
33 key = base64.urlsafe_b64decode(key)
34 except binascii.Error as exc:
35 raise ValueError(
36 "Fernet key must be 32 url-safe base64-encoded bytes."
37 ) from exc
38 if len(key) != 32:
39 raise ValueError(
40 "Fernet key must be 32 url-safe base64-encoded bytes."
41 )
43 self._signing_key = key[:16]
44 self._encryption_key = key[16:]
46 @classmethod
47 def generate_key(cls) -> bytes:
48 return base64.urlsafe_b64encode(os.urandom(32))
50 def encrypt(self, data: bytes) -> bytes:
51 return self.encrypt_at_time(data, int(time.time()))
53 def encrypt_at_time(self, data: bytes, current_time: int) -> bytes:
54 iv = os.urandom(16)
55 return self._encrypt_from_parts(data, current_time, iv)
57 def _encrypt_from_parts(
58 self, data: bytes, current_time: int, iv: bytes
59 ) -> bytes:
60 utils._check_bytes("data", data)
62 padder = padding.PKCS7(algorithms.AES.block_size).padder()
63 padded_data = padder.update(data) + padder.finalize()
64 encryptor = Cipher(
65 algorithms.AES(self._encryption_key),
66 modes.CBC(iv),
67 ).encryptor()
68 ciphertext = encryptor.update(padded_data) + encryptor.finalize()
70 basic_parts = (
71 b"\x80"
72 + current_time.to_bytes(length=8, byteorder="big")
73 + iv
74 + ciphertext
75 )
77 h = HMAC(self._signing_key, hashes.SHA256())
78 h.update(basic_parts)
79 hmac = h.finalize()
80 return base64.urlsafe_b64encode(basic_parts + hmac)
82 def decrypt(
83 self, token: typing.Union[bytes, str], ttl: typing.Optional[int] = None
84 ) -> bytes:
85 timestamp, data = Fernet._get_unverified_token_data(token)
86 if ttl is None:
87 time_info = None
88 else:
89 time_info = (ttl, int(time.time()))
90 return self._decrypt_data(data, timestamp, time_info)
92 def decrypt_at_time(
93 self, token: typing.Union[bytes, str], ttl: int, current_time: int
94 ) -> bytes:
95 if ttl is None:
96 raise ValueError(
97 "decrypt_at_time() can only be used with a non-None ttl"
98 )
99 timestamp, data = Fernet._get_unverified_token_data(token)
100 return self._decrypt_data(data, timestamp, (ttl, current_time))
102 def extract_timestamp(self, token: typing.Union[bytes, str]) -> int:
103 timestamp, data = Fernet._get_unverified_token_data(token)
104 # Verify the token was not tampered with.
105 self._verify_signature(data)
106 return timestamp
108 @staticmethod
109 def _get_unverified_token_data(
110 token: typing.Union[bytes, str]
111 ) -> typing.Tuple[int, bytes]:
112 if not isinstance(token, (str, bytes)):
113 raise TypeError("token must be bytes or str")
115 try:
116 data = base64.urlsafe_b64decode(token)
117 except (TypeError, binascii.Error):
118 raise InvalidToken
120 if not data or data[0] != 0x80:
121 raise InvalidToken
123 if len(data) < 9:
124 raise InvalidToken
126 timestamp = int.from_bytes(data[1:9], byteorder="big")
127 return timestamp, data
129 def _verify_signature(self, data: bytes) -> None:
130 h = HMAC(self._signing_key, hashes.SHA256())
131 h.update(data[:-32])
132 try:
133 h.verify(data[-32:])
134 except InvalidSignature:
135 raise InvalidToken
137 def _decrypt_data(
138 self,
139 data: bytes,
140 timestamp: int,
141 time_info: typing.Optional[typing.Tuple[int, int]],
142 ) -> bytes:
143 if time_info is not None:
144 ttl, current_time = time_info
145 if timestamp + ttl < current_time:
146 raise InvalidToken
148 if current_time + _MAX_CLOCK_SKEW < timestamp:
149 raise InvalidToken
151 self._verify_signature(data)
153 iv = data[9:25]
154 ciphertext = data[25:-32]
155 decryptor = Cipher(
156 algorithms.AES(self._encryption_key), modes.CBC(iv)
157 ).decryptor()
158 plaintext_padded = decryptor.update(ciphertext)
159 try:
160 plaintext_padded += decryptor.finalize()
161 except ValueError:
162 raise InvalidToken
163 unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
165 unpadded = unpadder.update(plaintext_padded)
166 try:
167 unpadded += unpadder.finalize()
168 except ValueError:
169 raise InvalidToken
170 return unpadded
173class MultiFernet:
174 def __init__(self, fernets: typing.Iterable[Fernet]):
175 fernets = list(fernets)
176 if not fernets:
177 raise ValueError(
178 "MultiFernet requires at least one Fernet instance"
179 )
180 self._fernets = fernets
182 def encrypt(self, msg: bytes) -> bytes:
183 return self.encrypt_at_time(msg, int(time.time()))
185 def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes:
186 return self._fernets[0].encrypt_at_time(msg, current_time)
188 def rotate(self, msg: typing.Union[bytes, str]) -> bytes:
189 timestamp, data = Fernet._get_unverified_token_data(msg)
190 for f in self._fernets:
191 try:
192 p = f._decrypt_data(data, timestamp, None)
193 break
194 except InvalidToken:
195 pass
196 else:
197 raise InvalidToken
199 iv = os.urandom(16)
200 return self._fernets[0]._encrypt_from_parts(p, timestamp, iv)
202 def decrypt(
203 self, msg: typing.Union[bytes, str], ttl: typing.Optional[int] = None
204 ) -> bytes:
205 for f in self._fernets:
206 try:
207 return f.decrypt(msg, ttl)
208 except InvalidToken:
209 pass
210 raise InvalidToken
212 def decrypt_at_time(
213 self, msg: typing.Union[bytes, str], ttl: int, current_time: int
214 ) -> bytes:
215 for f in self._fernets:
216 try:
217 return f.decrypt_at_time(msg, ttl, current_time)
218 except InvalidToken:
219 pass
220 raise InvalidToken